1 of 73

10-605 / 10-805�Map-Reduce and Spark

Machine Learning from Large Datasets

2 of 73

Monday’s recap

  • Administrivia that’s “in the syllabus”
  • History of big data for ML
    • To learn something complex you need lots and lots of data – the more the better!
    • Deep learning in general, and LLMs in particular, need big data
      • Big models are not enough
  • Working with big data means understanding how to do ML efficiently
    • Starting with standard complexity analysis and the cost of operations
    • Best case for i/o bound tasks is sequential access to data (from disk or from a network)

3 of 73

Today’s plan

  • Cost of operations
  • Learning as counting
    • density estimation
    • naïve Bayes
  • Distributed processing for big data
    • embarrassingly parallel tasks
    • map-reduce
      • abstract operations
      • Spark / pyspark

4 of 73

COST OF OPERATIONS

5 of 73

What computers act like

Store

Size

Typical Access Time

L1 Cache

128 Kb

1ns

L2 Cache

16 Mb

100x

4ns

4x

Main memory

8Gb (up to 192Gb)

500x

100ns - reference

25x

Read 1Mb sequentially

1000ns

10x

Disk

320Gb

40x

Read 1Mb sequentially

500,000ns

500x

Seek

2,000,000ns

4x

small and fast

big and slow

6 of 73

What computers act like

Store

Size

Typical Access Time

L1 Cache

128 Kb

1ns

L2 Cache

16 Mb

100x

4ns

4x

Main memory

8Gb (up to 192Gb)

500x

100ns - reference

25x

Read 1Mb sequentially

1000ns

10x

SSD

Read 1Mb sequentially

20,000ns

20x

Read 4k random-access

(similar)

Disk

320Gb

40x

Read 1Mb sequentially

500,000ns

25x

Network

Send 1Mb in same datacenter

(similar)

~ 1x

Disk

Seek

2, 000,000ns

4x

7 of 73

Visualize the cost of operations

8 of 73

Visualize the cost of operations

But memory gets cheaper over time, right?

Yes but

  • Data gets bigger
  • Disk gets cheaper also

9 of 73

Visualize the cost of operations

10 of 73

Summary: What do we count?

  • Big data comes from a disk, or the network
  • Getting data is slow
    • Memory access/instructions are qualitatively different from disk access
  • Memory is best when you stream through data sequentially
    • Seeks are qualitatively different from sequential reads on disk
  • Best case for data processing: stream through the data once in sequential order, as it’s found on disk.

11 of 73

Today’s plan

  • Cost of operations
  • Learning as counting
    • density estimation
    • naïve Bayes

12 of 73

DENSITY ESTIMATES AND NAÏVE BAYES

13 of 73

The Joint Density

  • Data:
    • All 5-grams that appear >= 40 times in a corpus of 1M English books
      • approx 80B words
      • 5-grams: 30Gb compressed, 250-300Gb uncompressed
      • Each 5-gram contains frequency distribution over years
    • Wrote code to compute
      • Pr(A,B,C,D,E|C=affect or C=effect)
      • Pr(any subset of A,…,E|any other fixed values of A,…,E with C=affect V effect)
  • Observations [from playing with data]:
    • Mostly effect not affect
    • Most common word before affect is not
    • After not effect most common word is a

14 of 73

The Joint Can Be Used To Implement A Classifier

14

 

 

The joint density:

A classifier for C based on the joint density estimator:

and predict

 

A “brute force” joint density estimate is a big table based on max likelihood estimates (MLEs)

 

or just

15 of 73

An experiment: how useful is that classifier?

  • Extracted all affect/effect 5-grams from an old Reuters corpus
    • about 20k documents
    • about 723 n-grams, 661 distinct
    • Financial news, not novels or textbooks
  • Tried to predict center word with:
    • Pr(C|A=a,B=b,D=d,E=e)

5% error

Of 723 examples:

  • 622 have no data for a,b,d,e (!)
  • Of the 101 that do have data
    • 100 predictions are correct
    • only 1 is wrong

Q: Is that good or bad performance?

16 of 73

An experiment: how useful is that classifier?

  • Extracted all affect/effect 5-grams from an old Reuters corpus
    • about 20k documents
    • about 723 n-grams, 661 distinct
    • Financial news, not novels or textbooks
  • Tried to predict center word with “backoff” estimates:
    • Pr(C|A=a,B=b,D=d,E=e)
    • then Pr(C|A=a,B=b,D=d)
    • then Pr(C|B=b,D=d)
    • then Pr(C|B=b)
    • then Pr(C)

Pattern

Used

Errors

P(C|A,B,D,E)

101

1

P(C|A,B,D)

157

6

P(C|B,D)

163

13

P(C|B)

244

78

P(C)

58

31

17.8% error overall

17 of 73

A “naïve” estimate of joint probability

17

 

The (brute force) joint density estimator uses MLEs:

The naïve estimator assumes independence:

 

 

18 of 73

A “naïve” but useful estimate of joint probability

18

If you want to use this to predict C you need can’t assume everything is independent of C….so….

 

 

 

and predict

 

 

or just

19 of 73

Naïve Bayes refinement - 1

19

In n-grams we look at Pr(b)=Pr(second word is “b”), etc.

For long documents you usually don’t keep track of where a word appears, so:

 

 

This is a really simple classifier: you just need to count (co-occurrences of words with classes, and number of examples in each class)

20 of 73

Naïve Bayes refinement - 2

20

For numeric stability it’s best to compute with logs:

 

 

21 of 73

Naïve Bayes refinements

21

You also usually smooth the estimates, e.g.:

 

where V is the number of words.

This is usually a terrible density estimator but often is a pretty good classifier!

 

or

22 of 73

Classification vs Density Estimation

Classification

Density Estimation

23 of 73

Classification vs density estimation

24 of 73

Today’s plan

  • Cost of operations
  • Learning as counting
    • density estimation
    • naïve Bayes
  • Distributed processing for big data
    • embarrassingly parallel tasks
    • map-reduce
      • abstract operations
      • Spark / pyspark

25 of 73

DISTRIBUTED PROCESSING FOR LARGE FILES

25

26 of 73

Linear-time Tasks

  • Best case, with one machine:
    • Input is many small things stored sequentially on disk
      • ”Small” means “fits in main memory”
        • “constant size” is a good approximation to this
      • Aka I can “stream” through them
    • To produce output, you make one pass in which you process each thing independently of all the others in the dataset and output one or more “things” for each input “thing”
      • Relaxation: you can make a fixed number of passes
      • Then everything is constant memory and linear time
    • If there’s one output for each input it’s like a Python map
    • If there are zero or one outputs it’s like a Python filter
    • … but we’ll call both of these a map for now

>>> map(lambda x:x*2, range(0, 20, 2))

[0, 4, 8, 12, 16, 20, 24, 28, 32, 36]

>>> filter(lambda x: x % 8 == 0, range(0, 20, 2))

[0, 8, 16]

26

27 of 73

Embarassingly Parallel Tasks

  • Better case, with P machines, each with it’s own disk:
    • Partition the input evenly across each machine
      • Each piece of the dataset is called a shard
    • Run the mapper independently on each of the P machines.
    • If necessary collect all the outputs together
      • But it might be better to keep the shards separate …
  • Advantages:
    • We have made the process faster
      • We have many machines and many disks

27

  • Disadvantages:
    • there’s a lot we can’t do in this way

28 of 73

Embarrassingly parallel (or not) ?

  • Examples, where input is a list of (small) documents
    • Classify documents by language
    • Remove non-English documents
    • Tokenize all the documents
    • Split documents into sentences
    • Any chain of the operations above
    • Find the most frequent words in the corpus?
    • Find the most frequent sentences in the corpus
    • Discard the longest 10% and the shortest 10% of the documents
    • Detect duplicate documents
    • Detect similar pairs of documents
  • Where input is a list of rows in a database relation
    • Any projection (subset the relation)
    • Add a new computed field
    • Join two relations

28

29 of 73

Similarity and duplicate detection

    • Similarity can be high for descriptions of distinct items:

      • AERO TGX-Series Work Table -42'' x 96'' Model 1TGX-4296 All tables shipped KD AEROSPEC- 1TGX Tables are Aerospec Designed. In addition to above specifications; - All four sides have a V countertop edge ...
      • AERO TGX-Series Work Table -42'' x 48'' Model 1TGX-4248 All tables shipped KD AEROSPEC- 1TGX Tables are Aerospec Designed. In addition to above specifications; - All four sides have a V countertop .. 
    • Similarity can be low for descriptions of identical items:

      • Canon Angle Finder C 2882A002 Film Camera Angle Finders Right Angle Finder C (Includes ED-C & ED-D Adapters for All SLR Cameras) Film Camera Angle Finders & Magnifiers The Angle Finder C lets you adjust  ...
      •  CANON 2882A002 ANGLE FINDER C FOR EOS REBEL® SERIES PROVIDES A FULL SCREEN IMAGE SHOWS EXPOSURE DATA BUILT-IN DIOPTRIC ADJUSTMENT COMPATIBLE WITH THE CANON® REBEL, EOS & REBEL EOS SERIES.

30 of 73

Compute Scaling Laws = Data Scaling Laws

Llama 1 (2023) training mixture

31 of 73

Another nearly-linear operation

  • Sorting is O(n log n), nearly linear

31

32 of 73

Map-reduce is more powerful

  • Where input is a list of (small) documents
    • Classify documents by language
    • Remove non-English documents
    • Tokenize all the documents
    • Split documents into sentences
    • Any chain of the operations above
    • Find the most frequent words in the corpus?
    • Find the most frequent sentences in the corpus
    • Discard the longest 10% and the shortest 10% of the documents
    • Detect duplicate documents
    • Detect similar pairs of documents
  • Where input is a list of rows in a database relation
    • Any projection (subset the relation)
    • Add a new computed field
    • Join two relations

32

Embarrassingly parallel plus one additional operation: shuffle-sort

33 of 73

Small-vocabulary word count: high level view

33

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

“ctr[w1] +=1”

Stream thru docs

“ctr[w2] +=1”

Counter Data Structure

(in memory)

“Messages”

Aside: we’re moving to a parallel setting where “messages” makes more sense.

34 of 73

Small-vocabulary word count: high level view

34

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

“ctr[w1] +=1”

Stream thru docs

“ctr[w2] +=1”

Counter Data Structure

(on disk)

“Messages”

Disk seeks will make this very slow!!

35 of 73

Large-vocabulary word count: distributed counters

35

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

“ctr[w1] +=1”

Streaming machine

“ctr[w2] +=1”

Counter Data Structure 1

(in memory 1)

“Messages”

Counter Data Structure P

(in memory P)

Message Router

Machines 1,…,P

Workable but expensive

36 of 73

Large-vocabulary word count with sorting

36

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

Pass 1

Sort

  • ctr[w1] += 1
  • ctr[w1] += 1
  • ….

Logic to process messages

Pass 2

Sorting

“ctr[w1] +=1”

“ctr[w2] +=1”

It’s easy to process messages without a big memory – because all the messages for a single word have been put together

37 of 73

Map-Reduce is Really Map, Shuffle-Sort, Reduce

37

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

Map Process 1

Partition/Sort

tmp 1

tmp 2

tmp 3

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

Map Process 2

Partition/Sort

tmp 1

tmp 2

tmp 3

  • C[x1] += D1
  • C[x1] += D2
  • ….

combine counter updates

Reducer 1

Merge Tmp Files

  • C[x2] += D1
  • C[x2] += D2
  • ….

combine counter updates

Reducer 2

Merge Tmp Files

tmp n

Distributed Shuffle-Sort

38 of 73

Partitioning Messages for Word Count

38

  • ctr[x1] += d1
  • ctr[x1] += d2
  • ….

Logic to combine counter updates

Reducer Machine 1

  • ctr[x2] += d1
  • ctr[x2] += d2
  • ….

combine counter updates

Reducer Machine 2

The shuffle/sort phrase:

  • The (key,value) pairs from each mapper are partitioned, sorted by key, and sent to reducers.
  • You need all the messages for the same word to be sent the same reducer.
  • Partitioning by a hash of the key (the word) often works.

The reduce phase:

  • Each reducer will scan through the sorted key-value pairs.

39 of 73

40 of 73

41 of 73

42 of 73

43 of 73

MAP-REDUCE�FRAMEWORKS

43

44 of 73

Frameworks

  • Google’s Map-Reduce Framework
    • Published in 2008
  • Hadoop: Open source Java version
  • Apache Flume, Apache Beam
  • FLINK
  • Spark
  • BashReduce: 126 lines of bash
  • Hazsoup: <500 lines of Python

44

45 of 73

Frameworks

What else is really needed in practice?

  • Distributing the data
  • Managing and inspecting processes
  • Inspecting process logs and other debugging support

45

46 of 73

Frameworks

What is really needed in practice?

  • Robustness
    • If you run a job on thousands of cheap machines, hardware failures happen!
      • “Dead” versus “Slow” is hard to detect
    • Main way to get robustness is redundancy
      • Multiple copies of the data
      • Backup runs for slow-finishing processes

46

47 of 73

Frameworks

More comments on debugging:

  • Debug your code by running it locally first
  • Be careful about how information is shared
    • In local processes information can be shared many ways
    • In distributed systems sharing is constrained so that it’s efficient
      • Beside the “messages” there are usually ways to “broadcast” files to every worker
        • For example the code the workers run ☺
      • But often something works locally but not in the cluster
  • Be careful about weird corner cases in the data
    • Like that one line in a million that crashes your code
      • Because when you run it on 10 million lines it will crash

47

48 of 73

ABSTRACTIONS FOR �MAP-REDUCE�

48

49 of 73

Abstractions On Top Of Map-Reduce

  • Some obvious streaming processes:
    • for each row in a table
      • Transform it and output the result

      • Decide if you want to keep it with some boolean test, and copy out only the ones that pass the test

49

Example: stem words in a stream of word-count pairs:

(“aardvarks”,1) 🡺 (“aardvark”,1)

Proposed syntax:

table2 = MAP table1 TO λ row : f(row))

f(row)🡺row’

Example: apply stop words

(“aardvark”,1) 🡺 (“aardvark”,1)

(“the”,1) 🡺 deleted

Proposed syntax:

table2 = FILTER table1 BY λ row : f(row))

f(row)🡺 {true,false}

50 of 73

Abstractions On Top Of Map-Reduce

  • A non-obvious? streaming processes:
    • for each row in a table
      • Transform it to a list of items
      • Splice all the lists together to get the output table (flatten)

50

Example: tokenizing a line

“I found an aardvark” 🡺 [“i”, “found”,”an”,”aardvark”]

“We love zymurgy” 🡺 [“we”,”love”,”zymurgy”]

..but final table is one word per row

“i”

“found”

“an”

“aardvark”

“we”

“love”

Proposed syntax:

table2 = FLATMAP table1 TO λ row : f(row))

f(row)🡺list of rows

This can be done with one Mapper, since a Map process can send any number of messages downstream

51 of 73

flashback: Naïve Bayes

51

 

 

Predicted y for doc x1,…,xd

where

52 of 73

naïve Bayes: testing

X=w1 Y=sports

X=w1 Y=news

X=..

X=w2 Y=…

X=…

5245

1054

2120

37

3

keys

w

Counts associated with w

aardvark

n[w^Y=sports]=2

agent

n[w^Y=sports]=1027, n[w^Y=news]=564

zynga

n[w^Y=sports]=21, n[w^Y=news]=4464

n

Count #times each (word,label) pair appears

53 of 73

naïve Bayes: testing

The general case:

We’re taking rows from a table

  • In a particular format (event,count)

Applying a function to get a new value

  • The word for the event

And grouping the rows of the table by this new value

Proposed syntax:

GROUP table BY λ row : f(row)

Could define f via: a function, a field of a defined record structure, …

f(row)🡺field

How to implement this with map-reduce?

  1. Output pairs (f(row),row) with a mapper process

  • Shuffle-sort pairs by key – which is f(row)

  • Reduce and aggregate by appending together all the values associated with the same key

54 of 73

w

Counters

aardvark

n[w^Y=sports]=2

agent

n[w^Y=sports]=…

agent

n[w^Y=sports]=…

agent

n[w^Y=sports]=…

n[w^Y=sports]=…

zynga

n[…]

zynga

n[…]

w

DocId

found

doc1

aardvark

doc1

zymurgy

doc2

..

Proposed syntax:

JOIN table1 BY λ row : f(row),

table2 BY λ row : g(row)

Doc1: “I found an aardvark”

Doc2: “We love zymurgy”

How to implement this with map-reduce? Similar to group-by

55 of 73

Spark: A High-Level Dataflow Language

55

56 of 73

Spark

  • Hadoop: Too low level
    • missing abstractions
    • hard to specify a workflow
    • latency is very high
  • Spark addresses these issues
  • Also: Spark is good for iterative operations
    • E.g., PageRank, E/M, k-means clustering, …
    • Spark lowers cost of repeated reads

56

Set of concise dataflow operations (“transformation”)

Dataflow operations are embedded in an API together with “actions”

57 of 73

Spark

  • Data abstraction
    • Sharded files 🡪 Resilient Distributed Datasets
      • RDDs are immutable
      • They are partitioned and sharded
      • You access them with high level operations that can be implemented by short Map-Reduce pipelines
      • You can mark RDDs as ‘persistent’ (the data is saved for debugging, reuse, etc) and RDDs can persist on disk or in memory
        • Memory-cached RDDs use a lot of cluster memory but are very useful
    • Two types of operations on RDDs
      • Transformations – lazy, return a ‘plan’ to make some change
      • Actions – eager, usually force execution of a map-reduce pipeline, and return some Python object

57

58 of 73

Spark examples

58

spark is a spark context object – sometimes sc

59 of 73

Spark examples

59

errors is a transformation, and thus a data strucure that explains HOW to do something

count() is an action: it will actually execute the plan for errors and return a value.

errors.filter() is a transformation

collect() is an action

everything is sharded (i.e., partitioned)

60 of 73

Spark examples

60

# modify errors to be stored in cluster memory

subsequent actions will be much faster

everything is sharded … and the shards are stored in memory of worker machines not local disk (if possible, and if you ask)

You can also persist() an RDD on disk.

61 of 73

Spark examples

61

# modify errors to be stored in cluster memory

everything is sharded … and the shards are stored in memory of worker machines not local disk (if possible)

You can also persist() an RDD on disk, which means it is stored somewhere for debugging purposes (or re-use).

persist-on-disk works because the RDD is read-only (immutable)

62 of 73

Spark examples: wordcount

62

the action

transformation on (key,value) pairs , which are special

63 of 73

Recap: Distributed Stream-and-Sort: �Map, Shuffle-Sort, Reduce

63

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

Map Process 1

Partition/Sort

Spill 1

Spill 2

Spill 3

  • doc 1
  • doc 2
  • doc 3
  • ….

Counting logic

Map Process 2

Partition/Sort

Spill 1

Spill 2

Spill 3

  • C[x1] += D1
  • C[x1] += D2
  • ….

Logic to combine counter updates

Reducer 1

Merge Spill Files

  • C[x2] += D1
  • C[x2] += D2
  • ….

combine counter updates

Reducer 2

Merge Spill Files

Spill n

Distributed Shuffle-Sort

64 of 73

Classic PySpark examples: wordcount

64

65 of 73

Spark examples: wordcount

65

66 of 73

Spark examples: wordcount

66

  1. Tokenization and converting to messages is done in parallel over each partition of the RDD lines
  2. reduceByKey does a shuffle-sort, doing a pairwise reduce of each “run” of messages

67 of 73

Spark examples: logistic regression

  • Model:

  • Learning algorithm:
    • define the “loss” of the model on the data
    • start with random w and iteratively adjust it to reduce loss
      • by gradient descent

68 of 73

Classic Spark examples: batch logistic regression

68

reduce is an action – it produces a numpy vector

p.x and w are vectors, from the numpy package

p.x and w are vectors, from the numpy package. Python overloads operations like * and + for vectors.

69 of 73

Spark examples: batch logistic regression

69

Important note: numpy vectors/matrices are not just “syntactic sugar”.

  • They are much more compact than something like a list of python floats.
  • numpy operations like dot, *, + are calls to optimized C code
  • a little python logic around a lot of numpy calls is pretty efficient

70 of 73

Spark examples: batch logistic regression

70

w is defined outside the lambda function, but used inside it

So: python builds a closure – code including the current value of w – and Spark ships it off to each worker. So w is copied, and must be read-only.

71 of 73

Spark examples: batch logistic regression

71

dataset of points is cached in cluster memory to reduce i/o

72 of 73

Spark examples: batch logistic regression

72

gradient computation is done in parallel, locally on each worker, using data kept in memory.

summing the gradient vectors is done in parallel, and intermediate sums are set across the network

updating the weights is done on the local machine

The lambda function, including the current value of w, is broadcast to the workers

73 of 73

Spark logistic regression example

73