1 of 29

CSE 444: Database Internals

Section 9:

Parallel Processing and MapReduce

1

2 of 29

Review in this section

  • Parallel DBMS

2

3 of 29

Parallel DBMS

R(a,b) is horizontally partitioned across N = 3 machines.

Each machine locally stores approximately 1/N of the tuples in R.

The tuples are randomly organized across machines (i.e., R is block partitioned across machines).

Show a RA plan for this query and how it will be executed across the N = 3 machines.

Pick an efficient plan that leverages the parallelism as much as possible.

SELECT a, max(b) as topb

FROM R

WHERE a > 0

GROUP BY a

3

4 of 29

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

4

1/3 of R

1/3 of R

1/3 of R

Machine 1

Machine 2

Machine 3

R(a, b)

5 of 29

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

5

1/3 of R

1/3 of R

1/3 of R

Machine 1

Machine 2

Machine 3

R(a, b)

scan

scan

scan

If more than one relation on a machine, then “scan S”, “scan R” etc

6 of 29

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

6

1/3 of R

1/3 of R

1/3 of R

Machine 1

Machine 2

Machine 3

R(a, b)

scan

scan

scan

σa>0

σa>0

σa>0

7 of 29

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

7

1/3 of R

1/3 of R

1/3 of R

Machine 1

Machine 2

Machine 3

R(a, b)

scan

scan

scan

σa>0

σa>0

σa>0

γa, max(b)-> b

γa, max(b)-> b

γa, max(b)-> b

8 of 29

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

8

1/3 of R

1/3 of R

1/3 of R

Machine 1

Machine 2

Machine 3

R(a, b)

scan

scan

scan

σa>0

σa>0

σa>0

γa, max(b)-> b

γa, max(b)-> b

γa, max(b)-> b

Hash on a

Hash on a

Hash on a

9 of 29

SELECT a, max(b) as topb FROM R�WHERE a > 0 GROUP BY a

9

1/3 of R

1/3 of R

1/3 of R

Machine 1

Machine 2

Machine 3

R(a, b)

scan

scan

scan

σa>0

σa>0

σa>0

γa, max(b)-> b

γa, max(b)-> b

γa, max(b)-> b

Hash on a

Hash on a

Hash on a

10 of 29

SELECT a, max(b) as topb FROM R�WHERE a > 0 GROUP BY a

10

1/3 of R

1/3 of R

1/3 of R

Machine 1

Machine 2

Machine 3

R(a, b)

scan

scan

scan

σa>0

σa>0

σa>0

γa, max(b)-> b

γa, max(b)-> b

γa, max(b)-> b

Hash on a

Hash on a

Hash on a

γa, max(b)->topb

γa, max(b)->topb

γa, max(b)->topb

11 of 29

Benefit of hash-partitioning

  • For parallel DBMS
    • It would avoid the data re-shuffling phase
    • It would compute the aggregates locally

11

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

12 of 29

SELECT a, max(b) as topb FROM R�WHERE a > 0 GROUP BY a

12

1/3 of R

1/3 of R

1/3 of R

Machine 1

Machine 2

Machine 3

Hash-partition on a for R(a, b)

scan

scan

scan

σa>0

σa>0

σa>0

γa, max(b)->topb

γa, max(b)->topb

γa, max(b)->topb

13 of 29

Problem 1)

13

Consider relations R(a,b), S(c,d), and T(e,f). All three are horizontally partitioned across N = 3 machines.

The tuples are randomly organized across machines.

Show a relational algebra plan for the following query and how it will be executed across the N = 3 machines:

SELECT *

FROM R, S, T

WHERE R.b = S.c

AND S.d = T.e

AND (R.a - T.f) > 100

14 of 29

Problem 1)

14

SELECT *

FROM R, S, T

WHERE R.b = S.c

AND S.d = T.e

AND (R.a - T.f) > 100

R(a,b)

S(c,d)

T(e,f)

15 of 29

Problem 1)

15

SELECT *

FROM R, S, T

WHERE R.b = S.c

AND S.d = T.e

AND (R.a - T.f) > 100

R(a,b)

S(c,d)

T(e,f)

16 of 29

Problem 1)

16

SELECT *

FROM R, S, T

WHERE R.b = S.c

AND S.d = T.e

AND (R.a - T.f) > 100

R(a,b)

S(c,d)

T(e,f)

17 of 29

Problem 1)

17

SELECT *

FROM R, S, T

WHERE R.b = S.c

AND S.d = T.e

AND (R.a - T.f) > 100

R(a,b)

S(c,d)

T(e,f)

18 of 29

Problem 1)

18

SELECT *

FROM R, S, T

WHERE R.b = S.c

AND S.d = T.e

AND (R.a - T.f) > 100

R(a,b)

S(c,d)

T(e,f)

19 of 29

Problem 1)

19

SELECT *

FROM R, S, T

WHERE R.b = S.c

AND S.d = T.e

AND (R.a - T.f) > 100

R(a,b)

S(c,d)

T(e,f)

20 of 29

Problem 1)

20

SELECT *

FROM R, S, T

WHERE R.b = S.c

AND S.d = T.e

AND (R.a - T.f) > 100

R(a,b)

S(c,d)

T(e,f)

21 of 29

Map Reduce

Explain how the query will be executed in MapReduce

SELECT a, max(b) as topb

FROM R

WHERE a > 0

GROUP BY a

Specify the computation performed in the map and the reduce functions

21

22 of 29

Map

  • Each map task
    • Scans a block of R
    • Calls the map function for each tuple
    • The map function applies the selection predicate to the tuple
    • For each tuple satisfying the selection, it outputs a record with key = a and value = b

22

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

Note: When each map task scans multiple relations, it needs to output something like

key = a and value = (‘R’, b) which has the relation name ‘R’

R(a, b)

23 of 29

Shuffle

  • The MapReduce engine reshuffles the output of the map phase and groups it on the intermediate key, i.e. the attribute a

23

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

Note: the programmer has to write only the map and reduce functions, the shuffle phase is done by the MapReduce engine (although the programmer can rewrite the partition function), but you should still mention this in HW6 answers.

R(a, b)

24 of 29

Reduce

24

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

  • Each reduce task
    • computes the aggregate value max(b) = topb for each group (i.e. a) assigned to it (by calling the reduce function)
    • outputs the final results: (a, topb)

Note: A local combiner can be used to compute local max before data gets reshuffled (in the map tasks)

R(a, b)

25 of 29

25

1/3 R

1/3 R

1/3 R

File system: HDFS

Could use a Combiner

(compute local max)

key = a, value =max(b)

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

Map:

  • Scans R
  • Applies selection predicate
  • Output: key = a, value = b

Reduce:

  • Computes final aggregate

R(a, b)

26 of 29

Benefit of hash-partitioning

  • For MapReduce
    • Logically, MR won’t know that the data is hash-partitioned
    • MR treats map and reduce functions as black-boxes and does not perform any optimizations on them

  • But, if a local combiner is used
    • Saves communication cost:
      • fewer tuples will be emitted by the map tasks
    • Saves computation cost in the reducers:
      • the reducers would not have to do as much work

26

SELECT a, max(b) as topb �FROM R�WHERE a > 0�GROUP BY a

27 of 29

Problem 2)

27

Consider two relations R(a, b) and S(b, c).

SELECT R.b, max(S.c) as cmax

FROM R, S

WHERE R.b = S.b

AND R.a <= 100

GROUP BY R.b

For the Map function, what are the computations performed, and what will be its outputs? Assume that the Map function reads a block of R or S relation as input.

For the Reduce function, what will be its inputs, what are the computations performed, and what will be its outputs?

28 of 29

Problem 2)

28

SELECT R.b, max(S.c) as cmax

FROM R, S

WHERE R.b = S.b

AND R.a <= 100

GROUP BY R.b

Map Function:

  • If the map function processes a block of the R relation, it applies the selection predicate to each R tuple in that block (R.a <= 100), and if the tuple passes the selection, it outputs a record with key= R.b and value= (‘R’ , R.a).
  • If the map function processes a block of the S relation, it outputs a record with key = S.b and value = (’S’, S.c).

Reduce Function:

  • Input to the reducer: The same b as the key and a list of R or S tuples (‘R’ , R.a) or (‘S’, S.c). In other words, we have … (b, (value from R, value from S, value from S, etc ….))
  • Computation: The reducer performs the local join of R and S and finds the max(S.c) value.

R(a, b)

S(b, c)

Note: In some cases, you may need to perform more than one MapReduce job to get the final result

29 of 29

Comparing between Parallel DBMSs and MapReduce Systems

29

Parallel DBMS:

  • Offers updates, transactions, indexing
  • Pipelined parallelism

MapReduce:

  • Fault-tolerance
  • Can handle stragglers
  • Easy to scale