1 of 53

1

Applied Data Analysis (CS401)

Robert West

Lecture 13

Scaling up

9 Dec 2020

Scaling up

2 of 53

Announcements

  • Project milestone P4 due in 9 days! (18 Dec)
  • Friday’s lab session:
    • Exercises on Spark (useful for your future projects, your job, your love life)
    • Examples of good data stories
    • Project office hours
  • Course eval is available on IS-Academia!
    • Note: this is different from the eval from a few weeks ago…

2

3 of 53

Feedback

3

Give us feedback on this lecture here: https://go.epfl.ch/ada2020-lec13-feedback

  • What did you (not) like about this lecture?
  • What was (not) well explained?
  • On what would you like more (fewer) details?
  • Where is Waldo?

4 of 53

So far in this class...

  • We made one big assumption:
  • All data fits on a single machine
  • Even more, all data fits into memory on a single machine (Pandas)
  • Realistic assumption for prototyping, but not for production code

5 of 53

The big-data problem

Data is growing faster than computation speed

Growing data sources

    • Web, mobile, sensors,

Cheap hard-disk storage

Stalling CPU speeds

RAM bottlenecks

6 of 53

Examples

Facebook’s daily logs: 60 TB

1000 Genomes project: 200 TB

Google Web index: 100+ PB

Cost of 1 TB of disk: $50

Time to read 1 TB from disk: 3 hours (100 MB/s)

7 of 53

The big-data problem

Single machine can no longer store, let alone process, all the data

Only solution is to distribute over a large cluster of machines

8 of 53

But how much data should you get?

Of course, “it depends”, but for many applications the answer is:

As much as you can get

Big data about people (text, Web, social media) tends to follow power law statistics.

Number of

search queries [log]

of freq >= x

x: search-query frequency [log]

Most queries occur�only once or twice

59% of all Web search queries are unique

17% of all queries were made only twice

8% were made three times

9 of 53

Hardware for big data

Budget (a.k.a. commodity) hardware

Not "gold-plated" (a.k.a. custom)

Many low-end servers

Easy to add capacity

Cheaper per CPU and per disk

Increased complexity in software:

  • Fault tolerance
  • Virtualization (e.g., distributed file systems)

Google Corkboard server: Steve Jurvetson/Flickr

10 of 53

Problems with cheap hardware

Failures, e.g. (Google numbers)

  • 1-5% hard drives/year
  • 0.2% DIMMs (dual in-line memory modules)/year

Commodity network (1-10 Gb/s) speeds vs. RAM

  • Much more latency (100x – 100,000x)
  • Lower throughput (100x-1000x)

Uneven performance

  • Inconsistent hardware skew
  • Variable network latency
  • External loads

Disclaimer: these numbers are constantly changing thanks to new technology!

11 of 53

Google datacenter

How to program this beast?

12 of 53

What’s hard about cluster computing?

How do we split work across machines?

How do we deal with failures?

13 of 53

How do you count the number of occurrences of each word in a document?

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?”

I: 3

am: 3

Sam: 3

do: 1

you: 1

like: 1

14 of 53

A hashtable (a.k.a. dict)!

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?”

{}

15 of 53

A hashtable!

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?”

{I: 1}

16 of 53

A hashtable!

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?”

{I: 1,

am: 1}

17 of 53

A hashtable!

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?”

{I: 1,

am: 1,

Sam: 1}

18 of 53

A hashtable!

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?”

{I: 2,

am: 1,

Sam: 1}

19 of 53

What if the document is really big?

20 of 53

What if the document is really big?

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?

I do not like them

Sam I am

I do not like

Green eggs and ham

Would you like...”

{I: 3,

am: 3,

Sam: 3

{do: 2, … }

{Sam:1,

… }

{Would: 1, … }

{I: 6,

am: 4,

Sam: 4,

do: 3

… }

21 of 53

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?

I do not like them

Sam I am

I do not like

Green eggs and ham

Would you like…”

{I: 3,

am: 3,

…}

{do: 1,

you: 1, … }

{Sam: 1,

I: 1,

… }

{Would: 1, you: 1,… }

{I: 6,

do: 3,

…}

{am: 5,

Sam: 4

…}

{you: 2

…}

{Would: 1

…}

“Divide and Conquer”

22 of 53

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?

I do not like them

Sam I am

I do not like

Green eggs and ham

Would you like…”

{I: 3,

am: 3,

…}

{do: 1,

you: 1, … }

{Sam: 1,

I: 1,

… }

{Would: 1, you: 1,… }

{I: 6,

do: 3,

…}

{am: 5,

Sam: 4

…}

{you: 2

…}

{Would: 1

…}

“Divide and Conquer”

MAP

23 of 53

“I am Sam

I am Sam

Sam I am

Do you like

Green eggs and ham?

I do not like them

Sam I am

I do not like

Green eggs and ham

Would you like…”

{I: 3,

am: 3,

…}

{do: 1,

you: 1, … }

{Sam: 1,

I: 1,

… }

{Would: 1, you: 1,… }

{I: 6,

do: 3,

…}

{am: 5,

Sam: 4

…}

{you: 2

…}

{Would: 1

…}

“Divide and Conquer”

MAP

REDUCE

24 of 53

What’s hard about cluster computing?

How to divide work across machines?

      • Must consider network, data locality
      • Moving data may be very expensive

How to deal with failures?

      • 1 server fails every 3 years => 10K nodes see 10 faults/day
      • Even worse: stragglers (node not failed, but slow)

25 of 53

Solution: MapReduce

  • Smart systems engineers have done all the work�for you
    • Task scheduling
    • Virtualization of file system
    • Fault tolerance (incl. data replication)
    • Job monitoring
    • etc.
  • “All” you need to do: implement Mapper and Reducer classes

25

Jeff Dean [facts]

26 of 53

Applied Machine Learning Days ’19 [link]

26

27 of 53

How to deal with failures?

{I: 6,

do: 3,

…}

{am: 5,

Sam: 4

…}

{you: 2

…}

{Would: 1

…}

{4: Sam,

5: am, … }

{2: you,

… }

{1: would,… }

Just launch another task!

{3: do,

6: I,

… }

28 of 53

How to deal with slow tasks?

{I: 6,

do: 3,

…}

{am: 5,

Sam: 4

…}

{you: 2

…}

{Would: 1

…}

{4: Sam,

5: am, … }

{2: you,

… }

{3: do,

6: I,

… }

{1: would,… }

Just launch another task!

29 of 53

Solution: MapReduce

  • Smart systems engineers have done all the work�for you
    • Task scheduling
    • Virtualization of file system
    • Fault tolerance (incl. data replication)
    • Job monitoring
    • etc.
  • “All” you need to do: implement Mapper and Reducer classes

29

Jeff Dean

Need to break more complex jobs into sequence of MapReduce jobs

30 of 53

Example task

Suppose you have user info in one file, website logs in another, and you need to find the top 5 pages most visited by users aged 18-25.

30

Load Users

Load Pages

Filter by age

Join on name

Group on url

Count visits

Order by visits

Take top 5

Example from http://wiki.apache.org/pig-data/attachments/PigTalksPapers/attachments/ApacheConEurope09.ppt

31 of 53

In MapReduce

31

32 of 53

Enter: .

  • A high-level API for programming�MapReduce-like jobs

32

sc = SparkContext()

print “I am a regular Python program, using the pyspark lib”

users = sc.textFile(‘users.tsv’) # user <TAB> age

.map(lambda s: tuple(s.split(‘\t’)))

.filter(lambda (user, age): age>=18 and age<=25)�pages = sc.textFile(‘pageviews.tsv’) # user <TAB> url

.map(lambda s: tuple(s.split(‘\t’)))�counts = users.join(pages)

.map(lambda (user, (age, url)): (url, 1)

.reduceByKey(add)

.takeOrdered(5)

33 of 53

  • Implemented in Scala (go, EPFL!)
  • Additional APIs in
    • Python
    • Java
    • R

33

34 of 53

RDD: resilient distributed dataset

  • To programmer: looks like one single list (each element represents a “row” of a dataset)
  • Under the hood: oh boy...
    • RDDs “live in the cloud”: split over several machines, replicated, etc.
    • Can be processed in parallel
    • Can be transformed to single, real list (if small...)
    • Typically read from the distributed file system (HDFS)
    • Can be written to the distributed file system

34

35 of 53

. architecture

35

Your Python script runs in the driver

RDD operations

are run in executors

36 of 53

RDD operations

  • “Transformations”
    • Input: RDD; output: another RDD
    • Everything remains “in the cloud”
    • Example: for every entry in the input RDD, count chars
      • RDD:[‘I’, ‘am’, ‘you’] → RDD:[1, 2, 3]
  • “Actions”
    • Input: RDD; output: a value that is returned to the driver
    • Result is transferred “from cloud to ground”
    • Example: take a sample of entries from RDD

36

37 of 53

Lazy execution [unrelated]

  • Transformations (i.e., RDD→RDD operations) are not�executed until it’s really necessary�(a.k.a. “lazy execution”)
  • Execution of transformations triggered by actions
  • Why?
    • If you never look at the data, there’s�no point in manipulating it…
    • Smarter query processing possible:�E.g., rdd2 = rdd1.map(f1)� rdd3 = rdd2.filter(f2)�Can be done in one go -- no need to�materialize rdd2

37

38 of 53

RDD transformations [full list]

  • map(func): Return a new distributed dataset formed by passing each element of the source through a function func
    • {1,2,3}.map(lambda x: x*2) → {2,4,6}
  • filter(func): Return a new dataset formed by selecting those elements of the source on which func returns true
    • {1,2,3}.filter(lambda x: x <= 2) → {1,2}
  • flatMap(func): Similar to map, but each input item can be mapped to 0 or more output items (so func should return a list rather than a single item)
    • {1,2,3}.flatMap(lambda x: [x,x*10]) → {1,10,2,20,3,30}

38

39 of 53

RDD transformations [full list]

  • sample(withReplacement?, fraction, seed): Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed
  • union(otherDataset): Return a new dataset that contains the union of the elements in the source dataset and the argument.
  • intersection(otherDataset): …
  • distinct(): Return a new dataset that contains the distinct elements of the source dataset.

39

40 of 53

RDD transformations [full list]

  • sample(withReplacement?, fraction, seed): Sample a fraction fraction of the data, with or without replacement, using a given random number generator seed

40

POLLING TIME!

Why relative fraction, and not absolute number?

41 of 53

RDD transformations [full list]

  • groupByKey(): When called on a dataset of (K, V) pairs, returns a dataset of (K, Iterable<V>) pairs.
    • {(1,a), (2,b), (1,c)}.groupByKey() → {(1,[a,c]), (2,[b])}
  • reduceByKey(func): When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs where the values for each key are aggregated using the given reduce function func, which must be of type (V, V) => V.
    • {(1, 3.1), (2, 2.1), (1, 1.3)}.reduceByKey(lambda (x,y): x+y)�→ {(1, 4.4), (2, 2.1)}

41

42 of 53

RDD transformations [full list]

  • sortByKey(): When called on a dataset of (K, V) pairs, returns a dataset of (K, V) pairs sorted by keys
  • join(otherDataset): When called on datasets of type (K, V) and (K, W), returns a dataset of (K, (V, W)) pairs with all pairs of elements for each key
    • {(1,a), (2,b)}.join({(1,A), (1,X)}) → {(1, (a,A)), (1, (a,X))}
  • Analogous: leftOuterJoin, rightOuterJoin, fullOuterJoin
  • (There are several other RDD transformations, and some of the above have additional arguments; cf. tutorial)

42

43 of 53

RDD actions [full list]

  • collect(): Return all the elements of the dataset as an array at the driver program. This is usually useful after a filter or other operation that returns a sufficiently small subset of the data.
  • count(): Return the number of elements in the dataset.
  • take(n): Return an array with the “first” n elements of the dataset.
  • saveAsTextFile(path): Write the elements of the dataset as a text file in a given directory in the local filesystem or HDFS.
  • (There are several other RDD actions; cf. tutorial)

43

44 of 53

Broadcast variables

  • my_set = set(range(1e80))�rdd2 = rdd1.filter(lambda x: x in my_set)�^ This is a bad idea: my_set needs to be shipped with every task (one task per data partition, so if rdd1 is stored in N partitions, the above will require copying the same object N times)
  • Better:�my_set = sc.broadcast(set(range(1e80)))�rdd2 = rdd1.filter(lambda x: x in my_set.value)�^ This way, my_set is copied to each executor only once and persists across all tasks (one per partition) on the same executor
  • Broadcast variables are read-only

44

45 of 53

Accumulators

  • def f(x): return x*2�rdd2 = rdd1.map(f)�^ How can we easily know how many rows there are in rdd1 (without running a costly reduce operation)?
  • Side effects via accumulators!�counter = sc.accumulator(0)def f(x): counter.add(1); return x*2�rdd2 = rdd1.map(f)
  • Accumulators are write-only (“add-only”) for executors
  • Only driver can read the value: counter.value

45

46 of 53

RDD persistence

rdd2 = rdd1.map(f1)

list1 = rdd2.filter(f2).collect()

list2 = rdd2.filter(f3).collect()

rdd2 = rdd1.map(f1)

rdd2.persist()

list1 = rdd2.filter(f2).collect()

list2 = rdd2.filter(f3).collect()

map(f1) transformation is executed twice

Result of map(f1) transformation is cached and reused (can choose between memory and disk)

46

}

}

47 of 53

Spark DataFrames

  • Bridging the gap between your experience with Pandas and the need for distributed computing
    • RDD = list of rows
    • DataFrame = table with rows and typed columns
  • Important to understand what RDDs are and what they offer, but today most of the tasks can be accomplished with DataFrames (higher level of abstraction => less code)
  • https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html

47

48 of 53

Spark SQL

sc = SparkContext()

sqlContext = HiveContext(sc)

df = sqlContext.sql("SELECT * from table1 GROUP BY id")

48

49 of 53

Spark's Machine Learning Toolkit

MLlib: Algorithms [more details]

Classification

  • Logistic regression, decision trees, random forests

Regression

  • Linear (with L1 or L2 regularization)

Unsupervised:

  • Alternating least squares
  • K-means
  • SVD
  • Topic modeling (LDA)

Optimizers

  • Optimization primitives (SGD, L-BGFS)

50 of 53

Example:

Logistic regression with MLLib

from pyspark.mllib.classification \

import LogisticRegressionWithSGD

�trainData = sc.textFile("...").map(...)�testData = sc.textFile("...").map(...)

model = \

LogisticRegressionWithSGD.train(trainData)

predictions = model.predict(testData)�

51 of 53

Remarks

  • This lecture is not enough to teach you Spark!
  • To use it in practice, you’ll need to delve into further online material
  • Also: Friday’s lab session
  • You can’t learn it without some frustration :(
  • Important skill: assess whether you’d benefit from Spark
    • E.g., >1TB: yes, you’ll need Spark
    • 20GB: it depends…

51

52 of 53

Cluster etiquette

  • Develop and debug locally
    • Install Spark locally on your personal computer
    • Use a small subset of the data
  • When ready, launch your script on the cluster using spark-submit
  • Never (never!) use the Spark shell (a.k.a. pyspark) -- it’s hereby officially forbidden
  • Useful trench report from a dlab member:�“What I learned from processing big data with Spark”

52

53 of 53

Feedback

53

Give us feedback on this lecture here: https://go.epfl.ch/ada2020-lec13-feedback

  • What did you (not) like about this lecture?
  • What was (not) well explained?
  • On what would you like more (fewer) details?
  • Where is Waldo?