Functional Python for Learning Data Science
Joel Grus
@joelgrus
joelgrus@gmail.com
(video of talk at https://www.youtube.com/watch?v=ThS4juptJjQ )
Functional Python for Learning Data Science
Joel Grus
@joelgrus
joelgrus@gmail.com
STUPID ITERTOOLS TRICKS
Functional Python for Learning Data Science
Joel Grus
@joelgrus
joelgrus@gmail.com
STUPID ITERTOOLS TRICKS
Functional Python for Learning Data Science
Joel Grus
@joelgrus
joelgrus@gmail.com
STUPID ITERTOOLS TRICKS
About Me
What
is
Functional
Programming?
Use Functions
Avoid
Side-Effects
First-Class Functions
Laziness
Immutability
Functional Programming in Python
Functional Programming in Python
3
from operator import add
functools
from functools import partial
partial function application ("currying")
def add1(x): return add(1, x)�
could be written as
add1 = partial(add, 1)
from functools import reduce
So now reduce(). This is actually the one I've always hated most, because, apart from a few examples involving + or *, almost every time I see a reduce() call with a non-trivial function argument, I need to grab pen and paper to diagram what's actually being fed into that function before I understand what the reduce() is supposed to do. So in my mind, the applicability of reduce() is pretty much limited to associative operators, and in all other cases it's better to write out the accumulation loop explicitly.
Guido van Rossum, 2005
from functools import reduce
So now reduce(). This is actually the one I've always hated most, because, apart from a few examples involving + or *, almost every time I see a reduce() call with a non-trivial function argument, I need to grab pen and paper to diagram what's actually being fed into that function before I understand what the reduce() is supposed to do. So in my mind, the applicability of reduce() is pretty much limited to associative operators, and in all other cases it's better to write out the accumulation loop explicitly.
Guido van Rossum, 2005
n.b. this criticism also applies to just about everything we'LL do today!
iterators
In [1]: xs = [1, 2, 3]��In [2]: it = iter(xs)��In [3]: next(it)�Out[3]: 1��In [4]: next(it)�Out[4]: 2��In [5]: next(it)�Out[5]: 3��In [6]: next(it)�---------------------------------------------------------------------------�StopIteration Traceback (most recent call last)�<ipython-input-121-5c05586d40e8> in <module>()�----> 1 next(iter)��StopIteration:
get an iterator
take its values with next
get a StopIteration exception when no values left
iterators
serving up values one-at-a-time with next means you can generate them on-demand
(laziness)
allows us to create lazy infinite sequences
generators
def lazy_integers(n=0):� while True:� yield n� n += 1��xs = lazy_integers()��[next(xs) for _ in range(10)]�# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]��# maintains state�[next(xs) for _ in range(10)]�# [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
infinite sequence!
function with yield creates a generator
generator comprehensions
# computes nothing until next or for�squares = (x**2 for x in lazy_integers())�doubles = (2*x for x in lazy_integers())��next(squares) # 0�next(squares) # 1�next(squares) # 4�next(squares) # 9��# don't do this!!!:�bad_squares = [x**2 for x in lazy_integers()]
generators and pipelines
$ cat euler.hs | grep -i prime | wc -l�63
with open("euler.hs", "r") as f:� lines = (line for line in f)� prime_lines = filter(lambda line: "prime" in line.lower(),� lines)
# Make sure to force evaluation before f goes out of scope!
# or else ValueError: I/O operation on closed file� line_count = len(list(prime_lines))
itertools
from itertools import count
count([start=0], [step=1])
Gives the infinite sequence:
start, start + step, start + 2 * step, ...
from itertools import islice
islice(seq, [start=0], stop, [step=1])
Returns a "lazy slice" out of seq
from itertools import tee
tee(it, [n=2])
splits an iterator into two or more memoized copies
huge efficiency gains if you have to iterate through expensive computations multiple times
from itertools import repeat
repeat(elem, [n=forever])
repeats elem n times (or forever if no n)
from itertools import cycle
cycle(p)
repeats the elements of p over and over and over again forever
from itertools import chain
chain(p, q, …)
iterates first through the elements of p, then the elements of q, and so on
from itertools import accumulate
accumulate(p, [func=add])
returns the sequence a, where
a[0] = p[0]
a[1] = func(a[0], p[1])
a[2] = func(a[1], p[2])
...
with default
func=add
this is "running total", however we will use it for way more than that
Down the rabbit hole!
We Need Some itertools of Our Own
# force the first n values of a sequence�def take(n, it):� return [x for x in islice(it, n)]�
# new sequence with all but the first n values of a sequence
def drop(n, it):� return islice(it, n, None)
# force the first value of a sequence�head = next�
# new sequence with all but the first value of a sequence�tail = partial(drop, 1)
We're also missing iterate
iterate(f, x)
should be the sequence x, f(x), f(f(x)), ...
missing iterate
def iterate(f, x):� """will blow the stack eventually"""� yield x� yield from iterate(f, f(x))
yield from is what sold me on Python 3
missing iterate
def iterate(f, x):� """will not blow the stack eventually"""� while True:� yield x� x = f(x)
but look at that awful mutation!
missing iterate
def iterate(f, x):� """crazy functional version"""� return accumulate(repeat(x), lambda fx, _: f(fx))
too clever trick, ignores all elements of the input sequence but the first, just applies f to previously "accumulated" value
using iterate
def lazy_integers():� return iterate(add1, 0)��take(10, lazy_integers())�# [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
fibonacci numbers
def fib(n):� if n == 0: return 1� if n == 1: return 1� return fib(n-1) + fib(n-2)��[fib(i) for i in range(10)]�# [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
super inefficient
fibonacci numbers
def fibs():� a, b = 0, 1� while True:� yield b� a, b = b, a + b��take(10, fibs())�# [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]
efficient, but look at all that terrible mutation!
fibonacci numbers
def fibs():� yield 1� yield 1� yield from map(add, fibs(), tail(fibs()))��take(10, fibs())�# [1, 1, 2, 3, 5, 8, 13, 21, 34, 55]��%time take(30, fibs())
CPU times: user 7.62 s, sys: 392 ms, total: 8.01 s
Wall time: 8.02 s
"Haskellic" version
but is regenerating the sequence over and over again
fibonacci numbers
def fibs():� yield 1� yield 1� fibs1, fibs2 = tee(fibs())� yield from map(add, fibs1, tail(fibs2))�
%time take(30, fibs())
CPU times: user 186 µs, sys: 11 µs, total: 197 µs
Wall time: 200 µs
create efficient memoized version
fibonacci numbers
def next_fib(pair):� x, y = pair� return (y, x + y)��def fibs():� return (y for x, y in iterate(next_fib, (0, 1)))
%time take(30, fibs())
CPU times: user 31 µs, sys: 4 µs, total: 35 µs
Wall time: 37.2 µs
now we're getting functional!
use a pure function
prime numbers (just for fun)
def filter_primes(it):� """will blow the stack"""� p = next(it)� yield p� yield from filter_primes(filter(lambda x: x % p > 0, it))��def all_primes():� return filter_primes(count(2))��take(10, all_primes())�# [2, 3, 5, 7, 11, 13, 17, 19, 23, 29]
What does any of this have to do with data science?
k-means clustering
k-means clustering
k-means clustering
Not-functional approach
class KMeans:� def __init__(self, k):� self.k = k� self.means = [None for _ in range(k)]
def predict(self, point):
"""return index of closest mean"""� d_min = float('inf')� for j, m in enumerate(self.means):� d = sum((m_i - p_i)**2
for m_i, p_i in zip(m, point))� if d < d_min: � prediction = j� d_min = d� return prediction
def fit(self, points, num_iters=10):� """find the k means"""� assignments = [None for _ in points]� self.means = random.sample(list(points), self.k) � for _ in range(num_iters):
# assign each point to its closest mean � for i, point in enumerate(points):
assignments[i] = self.predict(point)� # compute new means � for j in range(self.k):� cluster = [p for p, c in zip(points, assignments)
if c == j]� self.means[j] = list(
map(lambda x: x / len(cluster),
reduce(partial(map, add), cluster)))
# 100 random points in the unit square, 5 clusters
points = np.random.random((100,2))�model = KMeans(5)�model.fit(points)�assignments = [model.predict(point) for point in points]�
# now plot the means and the clusters�for x, y in model.means:� plt.plot(x, y, marker='*', markersize=10, color='black')�for j, color in zip(range(5), ['r', 'g', 'b', 'm', 'c']):� cluster = [p � for p, c in zip(points, assignments) � if j == c]� xs, ys = zip(*cluster)� plt.scatter(xs, ys, color=color)�plt.show()
# 100 random points in the unit square, 5 clusters
points = np.random.random((100,2))�model = KMeans(5)�model.fit(points)�assignments = [model.predict(point) for point in points]�
# now plot the means and the clusters�for x, y in model.means:� plt.plot(x, y, marker='*', markersize=10, color='black')�for j, color in zip(range(5), ['r', 'g', 'b', 'm', 'c']):� cluster = [p � for p, c in zip(points, assignments) � if j == c]� xs, ys = zip(*cluster)� plt.scatter(xs, ys, color=color)�plt.show()
Let's do it functional!
Let's do it functional!
def k_means(points, k, num_iters=10):� means = random.sample(points, k)� for _ in range(num_iters):� means = new_means(points, means)� return means
better, but look at all that terrible mutation!
pull work into new_means function
k_means
k_meanses
Let's do it crazy functional!
def k_meanses(points, k):� """returns the infinite sequence of meanses"""� initial_means = random.sample(points, k)� return iterate(partial(new_means, points),� initial_means)
Let's do it crazy functional!
def k_meanses(points, k):� """returns the infinite sequence of meanses"""� initial_means = random.sample(points, k)� return iterate(partial(new_means, points),� initial_means)
HUH?
Let's do it crazy functional!
def k_meanses(points, k):� """returns the infinite sequence of meanses"""� initial_means = random.sample(points, k)� return iterate(partial(new_means, points),� initial_means)
partial(new_means, points) is the (curried) function that maps
prev_means -> next_means
Let's do it crazy functional!
def k_meanses(points, k):� """returns the infinite sequence of meanses"""� initial_means = random.sample(points, k)� return iterate(partial(new_means, points),� initial_means)
iterate produces the series x, f(x), f(f(x)), ....
so this results in the (lazy, infinite) sequence:
Let's do it crazy functional!
def k_meanses(points, k):� """returns the infinite sequence of meanses"""� initial_means = random.sample(points, k)� return iterate(partial(new_means, points),� initial_means)��# 10 iterations�meanses = take(10, k_meanses(points, 5))��# until convergence�meanses = until_convergence(k_meanses(points, 5))
Let's do it crazy functional!
# until convergence�meanses = until_convergence(k_meanses(points, 5))
Why is this interesting? By generating a series of "meanses", we can observe how they converge.
Previously we just saw the end result
Let's do it functional!
def until_convergence(it):� prev = None� while True:� value = next(it)� if value == prev: raise StopIteration� yield value� prev = value
but look at all that terrible mutation!
Let's do it crazy functional!
def until_convergence(it):� return accumulate(it, no_repeat)
Let's do it crazy functional!
def until_convergence(it):� return accumulate(it, no_repeat)
def no_repeat(prev, curr):� if prev == curr:
raise StopIteration� else:
return curr
Let's do it crazy functional!
def until_nearly_convergence(it, tolerance=0.001):� return accumulate(it, partial(within_tolerance, tolerance))
def within_tolerance(tol, prev, curr):� if abs(prev - curr) < tol: � raise StopIteration� else: � return curr
Meanwhile, we still need new_means
def new_means(points, old_means):� k = len(old_means)� assignments = [closest_index(point, old_means)� for point in points]� clusters = [[point � for point, c in zip(points, assignments)� if c == j] for j in range(k)]� return [cluster_mean(cluster) for cluster in clusters]
Which means we need closest_index
def closest_index(point, means):� min_dist = float('inf')� for j, mean in enumerate(means):� dist = squared_distance(point, mean)� if dist < min_dist:� min_dist = dist� closest = j� return closest
but look at all that terrible mutation!
Let's be really functional
def closest_index(point, means):� distances = [squared_distance(point, mean) � for mean in means]� return min(enumerate(distances),� key=lambda pair: pair[1])[0]
We still need squared_distance
def squared_distance(p, q):� return sum((p_i - q_i)**2 � for p_i, q_i in zip(p, q))
And finally cluster_mean
def cluster_mean(points):� num_points = len(points)� dim = len(points[0]) if points else 0
� sum_points = [sum(point[j] for point in points) � for j in range(dim)]
� return [s / num_points for s in sum_points]
Aside: matplotlib animation
from matplotlib import animation��def animation_frame(nframe):� plt.cla()� x, y = get_data_for(nframe)� plt.plot(x, y)��fig = plt.figure(figsize=(5,4))�anim = animation.FuncAnimation(fig, animation_frame,� frames=num_frames)�anim.save('animation.gif', writer='imagemagick', fps=4)
data = [(random.random(), random.random()) for _ in range(500)]�meanses = [means for means in until_convergence(k_meanses(data, k))]�colors = ['r', 'g', 'b', 'c', 'm']�
def animation_frame(nframe):� means = meanses[nframe]� plt.cla()� assignments = [closest_index(point, means)� for point in data]� clusters = [[point� for point, c in zip(data, assignments)� if c == j] for j in range(k)]�� for cluster, color, mean in zip(clusters, colors, means):� x, y = zip(*cluster)� plt.scatter(x, y, color=color)� plt.plot(*mean, color=color, marker='*', markersize=20)
data = [(random.random(), random.random()) for _ in range(500)]�meanses = [means for means in until_convergence(k_meanses(data, k))]�colors = ['r', 'g', 'b', 'c', 'm']�
def animation_frame(nframe):� means = meanses[nframe]� plt.cla()� assignments = [closest_index(point, means)� for point in data]� clusters = [[point� for point, c in zip(data, assignments)� if c == j] for j in range(k)]�� for cluster, color, mean in zip(clusters, colors, means):� x, y = zip(*cluster)� plt.scatter(x, y, color=color)� plt.plot(*mean, color=color, marker='*', markersize=20)
data = [(random.choice([0,1,2,4,5]) + random.random(), � random.normalvariate(0, 1)) for _ in range(500)]�
meanses = [mean for mean in until_convergence(k_meanses(data, 5))]
data = [(random.choice([0,1,2,4,5]) + random.random(), � random.normalvariate(0, 1)) for _ in range(500)]�
meanses = [mean for mean in until_convergence(k_meanses(data, 5))]
Gradient Descent
Minimize a function by computing the gradient and taking small steps in the opposite direction
For example, say we want to find a minimum of
def f(x_i):
return sum(x_ij**2 for x_ij in x_i)
Gradient Descent
def f(x_i):
return sum(x_ij**2 for x_ij in x_i)
gradient is
def df(x_i):� return [2 * x_ij for x_ij in x_i]
def gradient_step(df, alpha, x_i):� return [x_ij + alpha * df_j � for x_ij, df_j in zip(x_i, df(x_i))]��gradient step is a pure function
if we curry df and alpha then it maps
point -> next_point
def gradient_step(df, alpha, x_i):� return [x_ij + alpha * df_j � for x_ij, df_j in zip(x_i, df(x_i))]�
which means we can just use iterate
�def gradient_descent(df, x_0, alpha=0.1):� return iterate(partial(gradient_step, df, -alpha),
x_0)
def gradient_step(df, alpha, x_i):� return [x_ij + alpha * df_j � for x_ij, df_j in zip(x_i, df(x_i))]�
which means we can just use iterate
�def gradient_descent(df, x_0, alpha=0.1):� return iterate(partial(gradient_step, df, -alpha),
x_0)
THIS IS (basically) A WORKING IMPLEMENTATION!
def gradient_step(df, alpha, x_i):� return [x_ij + alpha * df_j � for x_ij, df_j in zip(x_i, df(x_i))]��def gradient_descent(df, x_0, alpha=0.1):� return iterate(partial(gradient_step, df, -alpha), x_0)��take(100, gradient_descent(df, [random.random(),
random.random()]))[::20]�# [[0.3580493746949883, 0.8916606598206824],�# [0.004128028237968867, 0.010280147495191952],�# [4.7592925271786166e-05, 0.00011852203117737018],�# [5.487090701298894e-07, 1.3664659851407321e-06],�# [6.326184867255761e-09, 1.57542801958253e-08]]
# run gradient descent on x^2 + y^2 from 50 random points
def random_point():� return (2 * random.random() - 1, 2 * random.random() - 1)
colors = [color for color in matplotlib.colors.cnames]�
# get a length 25 "path" for each of 50 points�paths = [take(25, gradient_descent(df, random_point()))� for _ in range(50)]�
# the nth frame draws the nth point in every path�def animation_frame(nframe):� points = [path[nframe] for path in paths]� for color, point in zip(colors, points):� markersize = 10 - 10 * nframe / 25� plt.plot(*point, color=color, marker='*', markersize=markersize)
# run gradient descent on x^2 + y^2 from 50 random points
def random_point():� return (2 * random.random() - 1, 2 * random.random() - 1)
colors = [color for color in matplotlib.colors.cnames]�
# get a "path" for each point�paths = [take(25, gradient_descent(df, random_point()))� for _ in range(50)]�
# the nth frame draws the nth point in every path�def animation_frame(nframe):� points = [path[nframe] for path in paths]� for color, point in zip(colors, points):� markersize = 10 - 10 * nframe / 25� plt.plot(*point, color=color, marker='*', markersize=markersize)
# let's try a more complex function
def random_point(): � return (3 * random.random() - 1, 3 * random.random() - 1)��def f(x):� """f(x, y) = -exp(-x^3 / 3 + x - y^2)� has min at (1,0), saddle point at (-1,0)"""� return -math.exp(x[0]**3/-3 + x[0] - x[1]**2)��def df(x):� """just the gradient"""� return ((1 - x[0]**2) * f(x), -2 * x[1] * f(x))
# let's try a more complex function
def random_point(): � return (3 * random.random() - 1, 3 * random.random() - 1)��def f(x):� """f(x, y) = -exp(-x^3 / 3 + x - y^2)� has min at (1,0), saddle point at (-1,0)"""� return -math.exp(x[0]**3/-3 + x[0] - x[1]**2)��def df(x):� """just the gradient"""� return ((1 - x[0]**2) * f(x), -2 * x[1] * f(x))
Stochastic Gradient Descent
In previous example, just minimized a function of a single point.
When working with data, often want to choose parameter (beta) to minimize an (additive) error function across all the points
Could use gradient descent on "sum of errors" but can be very slow if lots of points
Stochastic Gradient Descent
Instead, compute the error (and error gradient) for one point at a time.
Take "single-point-gradient" steps.
Treat x and y as fixed (i.e. curry!), look for optimal value of beta.
def sgd_step(df, alpha, prev_beta, xy_i):� """df is a function of x_i, y_i, beta"""� x_i, y_i = xy_i� return [beta_j + alpha * df_j� for beta_j, df_j in zip(prev_beta, df(x_i, y_i, prev_beta))]
start with prev_beta
compute the gradient (for the given xi, yi)
take a small step in that direction
deal with one point at a time by zip-ing x and y together
def sgd_step(df, alpha, prev_beta, xy_i):� """df is a function of x_i, y_i, beta"""� x_i, y_i = xy_i� return [beta_j + alpha * df_j� for beta_j, df_j in zip(prev_beta, df(x_i, y_i, prev_beta))]��def sgd(df, x, y, beta_0, alpha=0.1):� xys = chain([beta_0], cycle(zip(x, y)))� return accumulate(xys, partial(sgd_step, df, -alpha))
what in the name of all that is holy
def sgd_step(df, alpha, prev_beta, xy_i):� """df is a function of x_i, y_i, beta"""� x_i, y_i = xy_i� return [beta_j + alpha * df_j� for beta_j, df_j in zip(prev_beta, df(x_i, y_i, prev_beta))]��def sgd(df, x, y, beta_0, alpha=0.1):� xys = chain([beta_0], cycle(zip(x, y)))� return accumulate(xys, partial(sgd_step, df, -alpha))
xys is the sequence: beta_0, (x0, y0), (x1, y1), (x2, y2), …, (x0, y0), (x1, y1), (x2, y2), ….
after currying, accumulate gets the function:
(beta, (x_i, y_i)) -> next_beta
Linear Regression: y = x β + ε
x = [(1, random.randrange(100)) for _ in range(100)]�y = [-5 * x_i[0] + 10 * x_i[1] + random.random() for x_i in x]��def predict(x_i, beta): return x_i[0] * beta[0] + x_i[1] * beta[1]�
Linear Regression: y = x β + ε
x = [(1, random.randrange(100)) for _ in range(100)]�y = [-5 * x_i[0] + 10 * x_i[1] + random.random() for x_i in x]��def predict(x_i, beta): return x_i[0] * beta[0] + x_i[1] * beta[1]�
least squares estimate for beta�def error(x_i, y_i, beta): return predict(x_i, beta) - y_i��def sqerror(x_i, y_i, beta): return error(x_i, y_i, beta) ** 2��def sqerror_gradient(x_i, y_i, beta):� return (2 * x_i[0] * error(x_i, y_i, beta), � 2 * x_i[1] * error(x_i, y_i, beta))
SGD for Linear Regression
x = [(1, random.random()) for _ in range(100)]�y = [-5 * x_i[0] + 10 * x_i[1] + random.random() for x_i in x]�
# start with random beta_0
beta_0 = (random.random(), random.random())�# run the process for a fixed number of steps
results = [x for x in take(steps, sgd(sqerror_gradient, x, y, beta_0, 0.01))]��# take every show_every-th results and animate them
subresults = results[::show_every]�nframes = len(subresults)��def animation_frame(nframe):� a, b = subresults[nframe]
# regression line goes through (0, a) and (1, a + b)� plt.plot([0, 1], [a, a+b])
SGD for linear regression
x = [(1, random.random()) for _ in range(100)]�y = [-5 * x_i[0] + 10 * x_i[1] + random.random() for x_i in x]�
# start with random beta_0
beta_0 = (random.random(), random.random())�# run the process for a fixed number of steps
results = [x for x in take(steps, sgd(sqerror_gradient, x, y, beta_0, 0.01))]��# take every show_every-th results and animate them
subresults = results[::show_every]�nframes = len(subresults)��def animation_frame(nframe):� a, b = subresults[nframe]
# regression line goes through (0, a) and (1, a + b)� plt.plot([0, 1], [a, a+b])
Moral of the story
Thanks!
follow me on twitter: @joelgrus
check out my book ----->
(use code "AUTHD" for 50% off!)
(only works if you buy at oreilly.com)
code is at
https://github.com/joelgrus/stupid-itertools-tricks-pydata
build cool stuff and tell me about it!
joelgrus@gmail.com