1 of 50

MapReduce and Hadoop

January 17, 2018

Tristan Glatard

FACULTY OF ENGINEERING

AND COMPUTER SCIENCE

Department of Computer Science and Software Engineering

2 of 50

Objectives

  1. Learn the MapReduce (MR) programming model

  • Understand MR’s execution framework

  • Understand MR’s performance bottleneck(s)

3 of 50

4 of 50

Slower

Larger

5 of 50

6 of 50

Introduction

Google’s data analytics

  • Input: crawled documents, Web request logs, etc
  • Output: inverted indices, graph statistics, most frequent queries, etc

Before MapReduce (< 2004): complex implementations mixing:

  • Programming logic
  • Parallelization
  • Data distribution
  • Handling of failures

MapReduce is:

  • A programming model
  • A parallel, data-aware, fault-tolerant implementation

7 of 50

Let’s talk!

About one of the most highly cited paper in the history of computer science

8 of 50

MapReduce:

summary

9 of 50

MapReduce Workflows

Multi-step MapReduce programs

Loops

1

2

termination

condition

10 of 50

Combiners

11 of 50

More WordCount: initial implementation

See Jimmy Lin page 47, algorithms 3.2 and 3.3

Data-Intensive Text Processing with MapReduce�Jimmy Lin and Chris Dyer.�Morgan & Claypool Publishers, 2010.

http://lintool.github.io/MapReduceAlgorithms/index.html

12 of 50

More WordCount: local aggregation

See Jimmy Lin page 47, algorithms 3.2 and 3.3

Data-Intensive Text Processing with MapReduce�Jimmy Lin and Chris Dyer.�Morgan & Claypool Publishers, 2010.

http://lintool.github.io/MapReduceAlgorithms/index.html

13 of 50

More WordCount: “in-mapper combining”

See Jimmy Lin page 47, algorithms 3.2 and 3.3

Data-Intensive Text Processing with MapReduce�Jimmy Lin and Chris Dyer.�Morgan & Claypool Publishers, 2010.

http://lintool.github.io/MapReduceAlgorithms/index.html

A given map task may process several key-value pairs, each in a separate call to the map function.

14 of 50

In-mapper combining

Advantages

  • Provides control over local aggregation
  • More efficient than combiners

Drawbacks

  • State preserved across multiple key-value pairs
  • May create tricky bugs
  • H has to fit in memory!

Data-Intensive Text Processing with MapReduce�Jimmy Lin and Chris Dyer.�Morgan & Claypool Publishers, 2010.

http://lintool.github.io/MapReduceAlgorithms/index.html

15 of 50

More on combiners

Data-Intensive Text Processing with MapReduce�Jimmy Lin and Chris Dyer.�Morgan & Claypool Publishers, 2010.

http://lintool.github.io/MapReduceAlgorithms/index.html

Note: reducer can’t be used as a combiner!

16 of 50

More on combiners

Data-Intensive Text Processing with MapReduce�Jimmy Lin and Chris Dyer.�Morgan & Claypool Publishers, 2010.

http://lintool.github.io/MapReduceAlgorithms/index.html

Combiner may never be executed!

17 of 50

More on combiners

Data-Intensive Text Processing with MapReduce�Jimmy Lin and Chris Dyer.�Morgan & Claypool Publishers, 2010.

http://lintool.github.io/MapReduceAlgorithms/index.html

18 of 50

More on combiners

Data-Intensive Text Processing with MapReduce�Jimmy Lin and Chris Dyer.�Morgan & Claypool Publishers, 2010.

http://lintool.github.io/MapReduceAlgorithms/index.html

19 of 50

Backup tasks to cope with stragglers

20 of 50

Bringing executions to completion

  • Problem
    • Reduced computing throughput at the end of the execution
  • Reasons
    • Tasks fail, tasks wait, some resources are slow

21 of 50

Solution 1: pilot jobs

  • Principle
    • Resources provisioned by pilots
    • Tasks scheduled on pilots
  • Results
    • Shorter waiting times
    • Reduced impact of failures
    • Load balanced w.r.t comp. speed

Pilot jobs

Application

tasks

Pilot jobs

Regular

S. Camarasu-Pop, T. Glatard, J. T. Moscicki, H. Benoit-Cattin, and D. Sarrut, "Dynamic partitioning of GATE Monte-Carlo simulations on EGEE"

Journal of Grid Computing, vol. 8, no. 2, pp. 241-259, mar, 2010

22 of 50

Solution 2: dynamic load-balancing (Monte-Carlo simulations)

  • Monte-Carlo simulations
    • Simulate P random events
    • With n jobs
    • Ex: estimating π
  • Load-balancing algorithms
    • Static

    • Dynamic

S. Camarasu-Pop, T. Glatard, J. T. Moscicki, H. Benoit-Cattin, and D. Sarrut, "Dynamic partitioning of GATE Monte-Carlo simulations on EGEE"

Journal of Grid Computing, vol. 8, no. 2, pp. 241-259, mar, 2010

23 of 50

Dynamic load-balancing: result

23

Static load balancing + pilot jobs

Dynamic load balancing + pilot jobs

S. Camarasu-Pop, T. Glatard, J. T. Moscicki, H. Benoit-Cattin, and D. Sarrut, "Dynamic partitioning of GATE Monte-Carlo simulations on EGEE"

Journal of Grid Computing, vol. 8, no. 2, pp. 241-259, mar, 2010

24 of 50

Solution 3: Task replication (backup tasks)

  • Replicate late tasks to increase chances of fast completion
  • Be careful of resource waste
  • Algorithm Result

24

Time

Completed tasks

R. Ferreira da Silva, T. Glatard, and F. Desprez, "Self-healing of operational workflow incidents on distributed computing infrastructures"

12th IEEE/ACM International Symposium on Cluster, Cloud and Grid Computing - CCGrid 2012, Ottawa, Canada, pp. 318-325, 05/2012

Without replication

With replication

25 of 50

MapReduce exercises

26 of 50

Inverted index (record level)

map(String key, String value):

// key: document name

// value: document content

map(String key, String value):

// key: document name

// value: document content

for each word w in value:

emit (w,key)

reduce (String key, Iterator values):

// key: a word

// values: a list of document names

documents = removeDuplicates(values)

emit (key, documents)

This is a very interesting document describing how...

Name: doc1

This is a boring document explaining that...

Name: doc2

this: doc1, doc2

is: doc1, doc2

a: doc1, doc2

boring: doc2

interesting: doc1

...

Inverted index

27 of 50

Relational-algebra operations

Data representation:

Input of map tasks:

  • key: table name (e.g. “users”)
  • value: record (e.g. “1975,08,18,John,Doe)

1975,08,18,John,Doe

1989,03,01,Jane,Doe

...

Table: users

28 of 50

Selection

SQL query: SELECT * from <table> WHERE <condition>

map(key, value):

// key: table name (e.g. “users”)

// value: record (e.g. “1975,08,18,John,Doe”)

if value satisfies <condition>

emit(value,value)

No reducer needed

map(key, value):

// key: table name (e.g. “users”)

// value: record (e.g. “1975,08,18,John,Doe”)

29 of 50

Projection

SELECT DISTINCT <attributes> from <table>

map(key, value):

proj = select <attributes> in value

emit(proj,proj)

reduce(key,values): // eliminates duplicates

emit(key,key)

30 of 50

Union

SELECT * from <table1> UNION SELECT * from <table2>

map(key, value):

emit(value, value)

reduce(key,values): // eliminates duplicates

emit(key,key)

31 of 50

Intersection

SELECT <attribute> from <table1> WHERE <attribute> IN

( SELECT <attribute> from <table2> )

map(key, value):

attribute = select <attribute> in value

emit(attribute, key)

reduce(key,values):

if size(removeDuplicates(values)) == 2

emit(key,key)

32 of 50

Difference

SELECT <attribute> from <table1> WHERE <attribute> NOT IN

( SELECT <attribute> from <table2>)

map(key, value):

attribute = select <attribute> in value

emit(attribute, key)

reduce(key,values):

if size(removeDuplicates(values)) == 1 && values[0] == “table1”

emit(key,key)

33 of 50

Matrix-Vector Multiplication

Assumption: M is big.

m11

m12

m13

m21

m22

m23

m33

m32

m31

M (3,3)

v (3,1)

v1

v2

v3

x =

x (3,1)

x1

x2

x3

34 of 50

Data (matrix) representation as key-value pairs

Non-null elements:

(“1,1”,m11)

(“1,2”,m12)

(“1,3”,m13)

(“2,1”,m21)

(“2,2”,m22)

Null elements:

omitted

  • Good representation when M has a lot of zeros (sparse matrix).

35 of 50

Matrix-Vector Multiplication

Assumption: M is big.

m11

m12

m13

m21

m22

m23

m33

m32

m31

M (3,3)

v (3,1)

v1

v2

v3

x =

x (3,1)

x1

x2

x3

36 of 50

The map task

map(key, value):

i = get_first_element(key)

j = get_second_element(key)

result = value * vj

emit(i,result)

37 of 50

The reduce task

reduce(key, values):

result = 0

for each value in values:

result += value

emit(key,result)

38 of 50

New assumption: v is big

Problem:

  • v doesn’t fit in memory
  • Mappers do a lot of disk accesses to access elements in v

Assigned to mapper x

Read by mapper x

M

v

39 of 50

New assumption: v is big

Solution:

  • Divide the matrix in n vertical stripes (one file per stripe)
  • Divide the vector in n horizontal stripes (one file per stripe)
  • Use the same map and reduce functions!

1 stripe: fits in memory

Assigned to mapper x

Read by mapper x

40 of 50

Conclusion

41 of 50

The overhead of disk I/O

42 of 50

Google used MapReduce widely

43 of 50

Beyond Google: The Hadoop ecosystem

44 of 50

The Hadoop Distributed File System (HDFS)

45 of 50

Hadoop Distributed File System (HDFS)

46 of 50

HDFS Blocks

  • HDFS files are broken into block-size chunks.
  • HDFS blocks are larger than disk blocks, typically 128MB.
  • HDFS blocks are not disk blocks, they are stored as files on the underlying file system (see demo).
  • Each block is replicated to a small number of different machines, typically three.
  • Notes:
    • A 1MB file stored with a block size of 128 MB uses 1 MB of disk space, not 128 MB.
    • Map tasks in MapReduce normally operate on one block at a time.

Hadoop: The Definitive Guide, Tom White, 4th edition, pp 45-46.

47 of 50

Some HDFS properties

  • Adding new nodes increases the total storage capacity

  • Files that are larger than any disk in the system can be stored

  • Compute tasks can be located where the data is stored

  • Fault tolerance through replication

48 of 50

HDFS: Reading data

Hadoop: The Definitive Guide, Tom White, 4th edition.

49 of 50

HDFS: Writing data

Hadoop: The Definitive Guide, Tom White, 4th edition.

50 of 50

HDFS: network distance

Hadoop: The Definitive Guide, Tom White, 4th edition.