CSE 344: Section 8
Cost Estimation + Parallel Databases
February 24th, 2022
Administrivia
Cost Estimation: Factors
B(R) = # blocks for relation R
T(R) = # tuples for relation R
V(R, a) = # of unique values of attribute a in relation R
M = # of available memory pages
Cost Estimation: Selection (σ)
Table scan = B(R)
Point Selection:
Index Based Selection (clustered) = B(R)/V(R, a)
Index Based Selection (unclustered) = T(R)/V(R, a)
Cost Estimation Disk I/O Formulas
Cost Estimation: Nested Loop Join (⋈)
Naive: B(R) + T(R)B(S)
for each tuple t1 in R do
for each tuple t2 in S do
if t1 and t2 join then output (t1,t2)
Cost Estimation: Nested Loop Join (⋈)
Block-at-a-time: B(R) + B(S)B(R)
for each block bR in R:
for each block bS in S:
for each tuple tR in bR:
for each tuple tS in bS:
if tR and tS can join:
output (tR,tS)
Cost Estimation: Nested Loop Join (⋈)
Block-nested-loop: B(R) + (B(R)/(M-1))*B(S) ≅ B(R) + (B(R)/(M)) * B(S)
for each group of M blocks bR in R:
for each block bS in S:
for each tuple tR in bR:
for each tuple tS in bS:
if tR and tS can join:
output (tR,tS)
Cost Estimation: Hash Join (⋈)
R joined with S (assume R is smaller in size)
B(R) + B(S)
Assuming B(R) < M for one pass (look at each table once) efficiency, read all of R into a hash table and join with all of S
Cost Estimation: Sort-Merge Join (⋈)
B(R) + B(S)
One pass (look at each table once); Both must be small (B(R) + B(S) < M)
Why would we use this over Hash Join?
Selectivity Formulas
Cardinality Estimation Example
Cardinality Estimation
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
SELECT sname
FROM Supply x, Supplier y
WHERE x.sid=y.sid AND x.pno=2 AND y.scity=’Seattle’ AND y.sstate=’WA’;
Supply(sid, pno, quantity)
Supplier(sid, sname, scity, sstate)
⋈sid=sid
Cardinality Estimation
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
Supply Statistics:
Supplier Statistics:
⋈sid=sid
Cardinality Estimation
Supply Statistics:
Supplier Statistics:
T(Supply) *
1 / V(Supply, pno)
= 4
T(Supplier) *
1 / V(Supplier, scity) *
1 / V(Supplier, sstate)
= 5
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
Cardinality Estimation
Supply Statistics:
Supplier Statistics:
T1 = 4
T2 = 5
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
Cardinality Estimation
Supply Statistics:
Supplier Statistics:
T1 = 4
T2 = 50
But wait a second…. Seattle is in Washington!
T(Supplier) * 1 / V(Supplier, scity) = 50
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
Cardinality Estimation
Supply Statistics:
Supplier Statistics:
T1 = 4
T2 = 50
Min(T1, T2)
= 4
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
Cardinality Estimation
Supply Statistics:
Supplier Statistics:
T1 = 4
T2 = 50
T3 = 4
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
Cardinality Estimation
Supply Statistics:
Supplier Statistics:
T1 = 4
T2 = 50
T3 = 4
No filtering at this step
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
Cardinality Estimation
Supply Statistics:
Supplier Statistics:
T1 = 4
T2 = 50
T3 = 4
Total = 4
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
Cost Estimation Example
Cost Estimation
Supply Statistics:
Supplier Statistics:
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
(Hash Join)
Cost Estimation
Supply Statistics:
Supplier Statistics:
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
(Hash Join)
(On the fly)
(Sequential
Scan)
(Sequential
Scan)
Cost Estimation
Supply Statistics:
Supplier Statistics:
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
(Hash Join)
(On the fly)
(Sequential
Scan)
(Sequential
Scan)
(On the fly)
Cost Estimation
Supply Statistics:
Supplier Statistics:
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
(Hash Join)
(On the fly)
(On the fly)
Cost = B(Supply) = 100
Cost = B(Supplier) = 100
Cost Estimation
Supply Statistics:
Supplier Statistics:
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
Cost = 0
Cost = 0
C1 = 100
C2 = 100
Cost Estimation
Supply Statistics:
Supplier Statistics:
Supply
σpno=2
Supplier
σscity=’Seattle’ Λ sstate=’WA’
πsname
⋈sid=sid
Total Cost = 100 + 100 = 200 I/Os
C1 = 100
C2 = 100
C3 = 0
C4 = 0
Parallel DBs
Data Partitioning
We focus on shared-nothing architecture and intra-operator parallelism.
Moving Data
We have a “network layer” to move tuples temporarily between nodes.
Transferring data is expensive, so we need to be efficient (especially on joins and grouping).
Moving Data:
Partitioned Hash-Join Mechanism
Key Points:
R1, S1
R2, S2
Rn, Sn
R1’, S1’
R2’, S2’
Rn’, Sn’
...
...
Contains tuples s.t.
h(R.x) = h(S.y) = red
Contains tuples s.t.
h(R.x) = h(S.y) = green
Contains
tuples s.t.
h(R.x) = h(S.y) = blue
Moving Data:
Broadcast Join Mechanism
Takes advantage of small datasets (can all fit into main memory)
Key Points:
R1
R2
Rn
R1’, S
R2’, S
Rn’, S
...
...
S
Contains all of S
Contains all of S
Contains all of S
Parallel Query Plans
Now, we need to know how to derive parallel plans from single node plans.
⋈
σ
π
MapReduce and Spark
Hadoop MapReduce
Hadoop Distributed File System (HDFS): Store and manage access to
large files (tables) that are terabytes or petabytes large with MapReduce Job
High Level Steps:
Fault Tolerance (Frequently write intermediate files to disk)
Counting Words w/ MapReduce
map(String key, String value):
// key: document name
// value: document contents
for each word w in value:
emitIntermediate(w, “1”);
reduce(String key, Iterator values):
// key: a word
// values: a list of counts
int result = 0;
for each v in values:
result += ParseInt(v);
emit(AsString(result));
Spark Objects
HW6 Tip!
Row
RowFactory.create(Objects...)
Dataset<Row>
JavaRDD<Row>
JavaPairRDD<K, V>
Tuple2<> (You can leave the generics empty)
Spark Methods
HW6 Tip!
spark.sql(“SELECT ... FROM ...”) (spark must be a SparkSession)
d.filter(t -> f(t) == true/false)
d.distinct()
d.map() (d must be a JavaRDD)
d.mapToPair(t -> new Tuple2<>(K, V))
d.reduceByKey((v1, v2) -> f(v1, v2)) (d must be a JavaPairRDD)
Parallel DB Practice!
We have a distributed database that holds the relations:
Drug(spec VARCHAR(255), compatibility INT)
Person(name VARCHAR(100) primary key, compatibility INT)
We want to compute:
SELECT P.name, count(D.spec)
FROM Person AS P, Drug AS D
WHERE P.compatibility = D.compatibility
GROUP BY P.name;
Drug is block-partitioned
Person is hash-partitioned on compatibility [h(n)]
You have three nodes. Draw a parallel query plan.
ƔP.name, count(D.spec)(P ⋈ D)
Node 1
Node 2
Node 3
P ⋈ D
⋈
⋈
ƔP.name,count(D.spec)
Ɣ
Ɣ
Hash [h(n)] Drug on compatibility
Takes advantage of:
Block partitioning of Drug