1 of 42

CSE 344: Section 8

Cost Estimation + Parallel Databases

February 24th, 2022

2 of 42

Administrivia

  • HW5 Milestone 2 late deadline is today
  • HW6 is due Monday
    • Please accept the AWS Academy invitation if you haven’t already done so.

3 of 42

Cost Estimation: Factors

B(R) = # blocks for relation R

T(R) = # tuples for relation R

V(R, a) = # of unique values of attribute a in relation R

M = # of available memory pages

4 of 42

Cost Estimation: Selection (σ)

Table scan = B(R)

Point Selection:

Index Based Selection (clustered) = B(R)/V(R, a)

Index Based Selection (unclustered) = T(R)/V(R, a)

5 of 42

Cost Estimation Disk I/O Formulas

  • Nested-Loop Joins
    • Block-at-a-time → B(R)+B(R)*B(S)
    • Nested-block-loop/Block-nested-loop → B(R)+(B(R)/M)*B(S)
      • More memory but faster runtime
  • Hash Join → B(R)+B(S)
    • Smaller table must fit in memory
  • Sort-Merge Join → B(R)+B(S)
    • Both tables must fit in memory simultaneously
  • Clustered Index Equijoin → B(R)+T(R)(X*B(S)) = B(R)+T(R)(B(S)/V(S, a))
  • Unclustered Index Equijoin → B(R)+T(R)(X*T(S)) = B(R)+T(R)(T(S)/V(S, a))

6 of 42

Cost Estimation: Nested Loop Join (⋈)

Naive: B(R) + T(R)B(S)

for each tuple t1 in R do

for each tuple t2 in S do

if t1 and t2 join then output (t1,t2)

7 of 42

Cost Estimation: Nested Loop Join (⋈)

Block-at-a-time: B(R) + B(S)B(R)

for each block bR in R:

for each block bS in S:

for each tuple tR in bR:

for each tuple tS in bS:

if tR and tS can join:

output (tR,tS)

8 of 42

Cost Estimation: Nested Loop Join (⋈)

Block-nested-loop: B(R) + (B(R)/(M-1))*B(S) ≅ B(R) + (B(R)/(M)) * B(S)

for each group of M blocks bR in R:

for each block bS in S:

for each tuple tR in bR:

for each tuple tS in bS:

if tR and tS can join:

output (tR,tS)

9 of 42

Cost Estimation: Hash Join (⋈)

R joined with S (assume R is smaller in size)

B(R) + B(S)

Assuming B(R) < M for one pass (look at each table once) efficiency, read all of R into a hash table and join with all of S

10 of 42

Cost Estimation: Sort-Merge Join (⋈)

B(R) + B(S)

One pass (look at each table once); Both must be small (B(R) + B(S) < M)

Why would we use this over Hash Join?

  • Tables are sorted on join attributes (No need to hash)
  • Range join instead of equijoin

11 of 42

Selectivity Formulas

  • Selectivity Factor (X) → Proportion of total data needed
    • Assuming uniform distribution of data values on numeric attribute a in table R, if the condition is:
      • a = c → X 1 / V(R, a)
      • a < c→ X (c - min(R, a))/ (max(R, a) - min(R, a))
      • c1 < a < c2→ X (c2 - c1)/ (max(R, a) - min(R, a))
      • cond1 and cond2 → X X1 * X2

12 of 42

Cardinality Estimation Example

13 of 42

Cardinality Estimation

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

SELECT sname

FROM Supply x, Supplier y

WHERE x.sid=y.sid AND x.pno=2 AND y.scity=’Seattle’ AND y.sstate=’WA’;

Supply(sid, pno, quantity)

Supplier(sid, sname, scity, sstate)

sid=sid

14 of 42

Cardinality Estimation

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

sid=sid

15 of 42

Cardinality Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

T(Supply) *

1 / V(Supply, pno)

= 4

T(Supplier) *

1 / V(Supplier, scity) *

1 / V(Supplier, sstate)

= 5

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

16 of 42

Cardinality Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

T1 = 4

T2 = 5

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

17 of 42

Cardinality Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

T1 = 4

T2 = 50

But wait a second…. Seattle is in Washington!

T(Supplier) * 1 / V(Supplier, scity) = 50

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

18 of 42

Cardinality Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

T1 = 4

T2 = 50

Min(T1, T2)

= 4

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

19 of 42

Cardinality Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

T1 = 4

T2 = 50

T3 = 4

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

20 of 42

Cardinality Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

T1 = 4

T2 = 50

T3 = 4

No filtering at this step

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

21 of 42

Cardinality Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

T1 = 4

T2 = 50

T3 = 4

Total = 4

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

22 of 42

Cost Estimation Example

23 of 42

Cost Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

(Hash Join)

24 of 42

Cost Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

(Hash Join)

(On the fly)

(Sequential

Scan)

(Sequential

Scan)

25 of 42

Cost Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

(Hash Join)

(On the fly)

(Sequential

Scan)

(Sequential

Scan)

(On the fly)

26 of 42

Cost Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

(Hash Join)

(On the fly)

(On the fly)

Cost = B(Supply) = 100

Cost = B(Supplier) = 100

27 of 42

Cost Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

Cost = 0

Cost = 0

C1 = 100

C2 = 100

28 of 42

Cost Estimation

Supply Statistics:

  • T(Supply) = 10000
  • B(Supply) = 100
  • V(Supply, pno) = 2500

Supplier Statistics:

  • T(Supplier) = 1000
  • B(Supplier) = 100
  • V(Supplier, scity) = 20
  • V(Supplier, sstate) = 10

Supply

σpno=2

Supplier

σscity=’Seattle’ Λ sstate=’WA’

πsname

sid=sid

Total Cost = 100 + 100 = 200 I/Os

C1 = 100

C2 = 100

C3 = 0

C4 = 0

29 of 42

Parallel DBs

30 of 42

Data Partitioning

We focus on shared-nothing architecture and intra-operator parallelism.

  • Block Partition
    • Tuples partitioned by raw size (no ordering considered)
  • Hash partitioned on attribute A
    • Node contains tuples with chosen attribute hashes
  • Range partitioned on attribute A
    • Node contains tuples in chosen attribute ranges

31 of 42

Moving Data

We have a “network layer” to move tuples temporarily between nodes.

Transferring data is expensive, so we need to be efficient (especially on joins and grouping).

32 of 42

Moving Data:

Partitioned Hash-Join Mechanism

  • Join on some attribute (e.g. R.x and S.y)
  • Call hash function h(z)

Key Points:

  • Hash shuffle tuples on join attributes

R1, S1

R2, S2

Rn, Sn

R1’, S1’

R2’, S2’

Rn’, Sn’

...

...

Contains tuples s.t.

h(R.x) = h(S.y) = red

Contains tuples s.t.

h(R.x) = h(S.y) = green

Contains

tuples s.t.

h(R.x) = h(S.y) = blue

33 of 42

Moving Data:

Broadcast Join Mechanism

Takes advantage of small datasets (can all fit into main memory)

Key Points:

  • Partition type of R doesn’t matter
  • S is unpartitioned and small
  • Distribute S across all R

R1

R2

Rn

R1’, S

R2’, S

Rn’, S

...

...

S

Contains all of S

Contains all of S

Contains all of S

34 of 42

Parallel Query Plans

Now, we need to know how to derive parallel plans from single node plans.

  • Which RA operations can you do without talking to other nodes?
  • Which RA operations require moving tuples?

σ

π

35 of 42

MapReduce and Spark

36 of 42

Hadoop MapReduce

Hadoop Distributed File System (HDFS): Store and manage access to

large files (tables) that are terabytes or petabytes large with MapReduce Job

High Level Steps:

  • Map Task (EmitIntermediate)
  • Reduce Task (Emit)

Fault Tolerance (Frequently write intermediate files to disk)

37 of 42

Counting Words w/ MapReduce

map(String key, String value):

// key: document name

// value: document contents

for each word w in value:

emitIntermediate(w, “1”);

reduce(String key, Iterator values):

// key: a word

// values: a list of counts

int result = 0;

for each v in values:

result += ParseInt(v);

emit(AsString(result));

38 of 42

Spark Objects

HW6 Tip!

Row

RowFactory.create(Objects...)

Dataset<Row>

JavaRDD<Row>

JavaPairRDD<K, V>

Tuple2<> (You can leave the generics empty)

39 of 42

Spark Methods

HW6 Tip!

spark.sql(“SELECT ... FROM ...”) (spark must be a SparkSession)

d.filter(t -> f(t) == true/false)

d.distinct()

d.map() (d must be a JavaRDD)

d.mapToPair(t -> new Tuple2<>(K, V))

d.reduceByKey((v1, v2) -> f(v1, v2)) (d must be a JavaPairRDD)

40 of 42

Parallel DB Practice!

41 of 42

We have a distributed database that holds the relations:

Drug(spec VARCHAR(255), compatibility INT)

Person(name VARCHAR(100) primary key, compatibility INT)

We want to compute:

SELECT P.name, count(D.spec)

FROM Person AS P, Drug AS D

WHERE P.compatibility = D.compatibility

GROUP BY P.name;

Drug is block-partitioned

Person is hash-partitioned on compatibility [h(n)]

You have three nodes. Draw a parallel query plan.

42 of 42

ƔP.name, count(D.spec)(P D)

Node 1

Node 2

Node 3

P D

ƔP.name,count(D.spec)

Ɣ

Ɣ

Hash [h(n)] Drug on compatibility

Takes advantage of:

  1. Hash partitioning of [h(n)]
  2. PK uniqueness of name

Block partitioning of Drug