1 of 62

1

Musketeer

all for one, one for all in data processing systems

Ionel Gog Malte Schwarzkopf Natacha Crooks �Matthew P. Grosvenor Allen Clement Steven Hand

2 of 62

Meet Kermit from Sesame, Inc.

  • Kermit wants to run:�
    • batch computations on�different inputs�
    • graph computations on�company’s social network�
    • a recommendation engine

2

CamSaS

3 of 62

Behold the chaos!

3

HDFS/GFS

Execution engine

Giraph

PowerGraph

GraphChi

Pregel

Graph processing

Storage system

Hive

Pig

Tenzing

Scope

SparkSQL

Sawzall

FlumeJava

DryadLINQ

GraphLINQ

Lindi

Shark

GraphX

Hadoop/MR

Dryad

Spark

Dremel

Naiad

Metis

CIEL

Language/library

CamSaS

4 of 62

4

Which system� is the right �one to use?

CamSaS

5 of 62

Join between two data sets

5

PULL DATA FROM HDFS

LOAD DATA

RUNTIME

PUSH DATA TO HDFS

Makespan

Less is better

Local cluster�7 nodes

(EC2 results similar)

CamSaS

6 of 62

Join between two data sets

6

PULL DATA FROM HDFS

LOAD DATA

RUNTIME

PUSH DATA TO HDFS

Makespan

BEST

Local cluster�7 nodes

(EC2 results similar)

Asymmetric:�5 million ⋈ 69 million rows

Less is better

CamSaS

7 of 62

Join between two data sets

7

PULL DATA FROM HDFS

LOAD DATA

RUNTIME

PUSH DATA TO HDFS

Makespan

BEST

BEST

Local cluster�7 nodes

(EC2 results similar)

Asymmetric:�5 million ⋈ 69 million rows

Symmetric: �39 million ⋈ 39 million rows

Less is better

CamSaS

8 of 62

PageRank over Twitter graph

8

(42M vertices, 1.4B edges)

100 nodes

CamSaS

9 of 62

PageRank over Twitter graph

9

(42M vertices, 1.4B edges)

100 nodes

CamSaS

10 of 62

PageRank over Twitter graph

10

(42M vertices, 1.4B edges)

100 nodes

CamSaS

11 of 62

PageRank over Twitter graph

11

(42M vertices, 1.4B edges)

100 nodes

16 nodes

Faster than Hadoop on 100 nodes

CamSaS

12 of 62

PageRank over Twitter graph

12

(42M vertices, 1.4B edges)

100 nodes

16 nodes

CamSaS

13 of 62

PageRank over Twitter graph

13

(42M vertices, 1.4B edges)

100 nodes

16 nodes

1 node

FASTEST

MOST�EFFICIENT

For two simple computations, �five different systems can be best!

CamSaS

14 of 62

14

How can we help Kermit choose?

CamSaS

15 of 62

Behold the chaos!

15

HDFS/GFS

Execution engine

Language/library

Giraph

PowerGraph

GraphChi

Pregel

Graph processing

Storage system

Hive

Pig

Tenzing

Scope

SparkSQL

Sawzall

FlumeJava

DryadLINQ

GraphLINQ

Lindi

Shark

GraphX

Hadoop/MR

Dryad

Spark

Dremel

Naiad

Metis

CIEL

CamSaS

16 of 62

Front-end languages

16

HDFS/GFS

Execution engine

Language/library

Giraph

PowerGraph

GraphChi

Pregel

Graph processing

Storage system

Hive

Pig

Tenzing

Scope

SparkSQL

Sawzall

FlumeJava

DryadLINQ

GraphLINQ

Lindi

Shark

GraphX

Hadoop/MR

Dryad

Spark

Dremel

Naiad

Metis

CIEL

CamSaS

17 of 62

Back-end execution engines

17

HDFS/GFS

Execution engine

Language/library

Giraph

PowerGraph

GraphChi

Pregel

Graph processing

Storage system

Hive

Pig

Tenzing

Scope

SparkSQL

Sawzall

FlumeJava

DryadLINQ

GraphLINQ

Lindi

Shark

GraphX

Hadoop/MR

Dryad

Spark

Dremel

Naiad

Metis

CIEL

CamSaS

18 of 62

18

Hadoop/MR

Dryad

Spark

Hive

Pig

Scope

GraphX

Sawzall

FlumeJava

DryadLINQ

Dremel

SparkSQL

Giraph

PowerGraph

GraphChi

Pregel

Shark

Naiad

GraphLINQ

Lindi

CIEL

X-Stream

Metis

HDFS/GFS

Front-end

Back-end

Fixed binding

CamSaS

19 of 62

Musketeer

19

Hadoop/MR

Dryad

Spark

Hive

Pig

Scope

GraphX

Sawzall

FlumeJava

DryadLINQ

Dremel

SparkSQL

Giraph

PowerGraph

GraphChi

Pregel

Shark

Naiad

GraphLINQ

Lindi

CIEL

X-Stream

Metis

HDFS/GFS

Front-end

Back-end

Intermediate representation

Intermediate representation

Lindi

Hive

SQL DSL

GAS DSL

Hadoop/MR

Spark

PowerGraph

GraphChi

Metis

Naiad

CamSaS

20 of 62

Musketeer @ 10,000ft

20

SQL�query

GAS�kernel

translate to IR

SQL�query

GAS�kernel

translate to IR

1

CamSaS

21 of 62

Musketeer @ 10,000ft

21

SQL�query

GAS�kernel

translate to IR

optimize IR

optimize IR

2

CamSaS

22 of 62

Musketeer @ 10,000ft

22

SQL�query

GAS�kernel

optimize IR

translate to IR

optimize IR

recognize idioms

recognize idioms

3

CamSaS

23 of 62

Musketeer @ 10,000ft

23

SQL�query

GAS�kernel

optimize IR

translate to IR

recognize idioms

merge operators

merge operators

4

CamSaS

24 of 62

Musketeer @ 10,000ft

24

SQL�query

GAS�kernel

optimize IR

translate to IR

recognize idioms

merge operators

map to systems

map to systems

5

CamSaS

25 of 62

Musketeer @ 10,000ft

25

SQL�query

GAS�kernel

optimize IR

recognize idioms

merge operators

map to systems

translate to IR

expand�templates

dispatch�jobs

expand�templates

dispatch�jobs

6

CamSaS

26 of 62

Musketeer @ 10,000ft

26

map to systems

SQL�query

GAS�kernel

translate to IR

optimize IR

recognize idioms

merge operators

expand�templates

dispatch�jobs

CamSaS

27 of 62

Intermediate representation (IR)

27

SQL�query

GAS�kernel

translate to IR

1

CamSaS

28 of 62

Intermediate representation (IR)

  • DAG of data-flow operators�
  • Relational and aggregation operators�
  • Fixed-point iteration�
  • User defined functions�
  • Turing-complete (cf. CIEL)

28

CamSaS

29 of 62

29

map to systems

5

CamSaS

30 of 62

Mapping to systems

  • Job boundaries: partition the DAG�
  • System bindings: choose systems�for sub-DAGs�

30

GraphChi job

Hadoop job

Equivalent to k-way graph partitioning, which is NP-hard :-(

edges

SUM

MULTIPLY

AGGREGATE

PROJECT

DIVIDE

JOIN

WHILE

COUNT

JOIN

vertices

CamSaS

31 of 62

System bindings

  1. Data volume
    • Operator data size bounds based on whether it is generative or selective
  2. Operator performance
    • Benchmark each operator in each back-end system
  3. Workflow history
    • Record input, intermediate and output �data size

31

CamSaS

32 of 62

Job boundaries

  1. Exhaustive search (small workflows)
    1. Generate all possible partitionings
    2. Pick the best one�

32

CamSaS

33 of 62

Job boundaries

  • Exhaustive search (small workflows)
    • Generate all possible partitionings
    • Pick the best one�
  • Dynamic heuristic (large workflows)
    • Topologically sort the DAG
    • Use dynamic programming to determine�the best partitioning

33

CamSaS

34 of 62

Dynamic heuristic

34

COUNT

Dynamic �heuristic

Hadoop job

GraphChi job

2

JOIN

WHILE

JOIN

DIVIDE

PROJECT

AGGREGATE

MULTIPLY

SUM

SUM

MULTIPLY

AGGREGATE

PROJECT

DIVIDE

JOIN

WHILE

JOIN

COUNT

Topological sort

1

edges

SUM

MULTIPLY

AGGREGATE

PROJECT

DIVIDE

JOIN

WHILE

COUNT

JOIN

vertices

CamSaS

35 of 62

Job generation

35

expand�templates

dispatch�jobs

6

CamSaS

36 of 62

Job generation

36

  • Recursive template expansion

public IEnumerable<{{OUTPUT_TYPE}}> {{FUN_NAME}}(...) {

{{VAL_TYPE}} outValue = {{INIT_VAL}};

foreach (var value in values)

{{UPDATE_VAL}};

yield return new

+

public IEnumerable<LongTriple> SumRatings(LongPair movIds, IEnumerable<LongTriple> ratings) {

long allRatings = 0;

foreach (var rating in ratings)

allRatings += rating.second;

yield return new LongTriple(movIds.s, movIds.t, allRatings);

}

Auto-generated job implementation

Dispatch

CamSaS

37 of 62

Evaluation

  1. Workflow speedup��
  2. Overhead of generated code��
  3. Quality of decisions made

37

CamSaS

38 of 62

TPC-H (Q17): Business decisions

38

Less is better

100 nodes

CamSaS

39 of 62

TPC-H (Q17): Business decisions

39

Less is better

100 nodes

CamSaS

40 of 62

TPC-H (Q17): Business decisions

40

100 nodes

uses Naiad

Less is better

CamSaS

41 of 62

Netflix movie recommendation

41

100 nodes

Less is better

CamSaS

42 of 62

Netflix movie recommendation

42

100 nodes

Less is better

CamSaS

43 of 62

Netflix movie recommendation

43

100 nodes

Less is better

CamSaS

44 of 62

Netflix movie recommendation

44

uses Naiad

100 nodes

Less is better

CamSaS

45 of 62

PageRank on Twitter graph

45

100 nodes

Less is better

CamSaS

46 of 62

PageRank on Twitter graph

46

100 nodes

Less is better

CamSaS

47 of 62

PageRank on Twitter graph

47

100 nodes

16 nodes

Less is better

CamSaS

48 of 62

PageRank on Twitter graph

48

100 nodes

16 nodes

Less is better

CamSaS

49 of 62

PageRank on Twitter graph

49

100 nodes

16 nodes

1 node

Less is better

CamSaS

50 of 62

PageRank on Twitter graph

50

100 nodes

16 nodes

1 node

Less is better

CamSaS

51 of 62

Generated code overhead

51

5-20% overhead

can be improved

PageRank on Twitter graph

Less is better

CamSaS

52 of 62

Automatic mapping

52

33 workflow setups

  • 6 workflows: TPC-H, PageRank, SSSP, Top-shopper, Netflix, Community PageRank
  • 33 different configurations by varying the input data size

Run graph computations on specialized systems

Small computations on single machine systems

Large computations on scalable systems

CamSaS

53 of 62

Automatic mapping

53

Musketeer

33 workflow setups

CamSaS

54 of 62

Automatic mapping

54

Musketeer

33 workflow setups

CamSaS

55 of 62

Automatic mapping

55

Musketeer

33 workflow setups

�Always within 10% of best choice

CamSaS

56 of 62

Summary and conclusions

  • Musketeer automatically translates Kermit’s workflows to many data processing systems�
  • This is convenient, improves performance and gives “future-proof”, portable workflows�
  • All of this is automatic!

56

@CamSysAtScale

@ICGog

CamSaS

57 of 62

57

��Backup slides

CamSaS

58 of 62

Scheduler runtime

58

Less is better

Note: log scale

CamSaS

59 of 62

Scheduler limitations

59

3. JOIN

2. UNION

3. JOIN

2. UNION

Topological sort

Heuristic

Optimal

JOIN

JOIN

UNION

PROJECT

1. JOIN

4. PROJECT

1. JOIN

4. PROJECT

CamSaS

60 of 62

Do we want to combine frameworks?

  • The PageRank of all�the vertices present�in two communities�
  • First we intersect them�and afterwards we �compute Pagerank�
  • 4.8M vertices, 68M edges�
  • 5.8M vertices, 82M edges

60

CamSaS

61 of 62

Operator merge

61

PageRank on

LiveJournal (4.8M vertices, 69M edges)

Top-shopper

CamSaS

62 of 62

62

CamSaS