Spark
Performance Enhancement of Broadcast Hash Join Queries in Spark - Iceberg
Existing Spark Feature
SELECT population, census_data.state_name FROM census_data JOIN state_economy ON census_data.state_name = state_economy.state_name WHERE gdp > 50000000
Table census_data {
population Long Type,
state_name String Type
}
partitioned by state_name
Table state_economy {
gdp Long Type,
state_name String Type
}
3
© 2022 Cloudera, Inc. All rights reserved.
How DPP Works
Project (population, state_name)
Join (census_data.state_name = state_economy.state_name)
Table census_data
Filter
gdp >50000000
Table
state_economy
Data Source V2 ( Iceberg)
Partition column (state_name)
/az /file1
/or /file2
/ca /file3
/co /file4
4
© 2022 Cloudera, Inc. All rights reserved.
How DPP Works - Continued
DPP Query would be somewhat like
select distinct state_name from state_economy where gdp > 50000000
NOTE:� If the join in spark is Broadcast Hash Join with state_economy table being broadcasted (after applying the filter), the broadcasted variable’s data is the DPP query result. In which case, Spark will not fire a DPP query
5
© 2022 Cloudera, Inc. All rights reserved.
How DPP Works - Continued
Project (population, state_name)
Join (census_data.state_name = state_economy.state_name)
Table census_data
Filter
gdp >50000000
Table
state_economy
Data Source V2 ( Iceberg)
Partition column (state_name)
Runtime Filter state_name IN (‘OR’, ‘CA’)
/az /file1
/or /file2
/ca /file3
/co /file4
6
© 2022 Cloudera, Inc. All rights reserved.
DPP : Summary
7
© 2022 Cloudera, Inc. All rights reserved.
Can DPP concept be taken further?
8
© 2022 Cloudera, Inc. All rights reserved.
Broadcast Hash Joins of Spark tick all boxes
For example: assume RANGE-IN with values (1, 5, 9, 15, 23) for column X
And (min, max) stats for column X in Manifest file for a data file is (10, 14)
So it is easy to see that, the data file can be excluded.
9
© 2022 Cloudera, Inc. All rights reserved.
Example query
Select name, class, course_desc from students join courses on students.courseId = courses.courseId where courses.instructLang = “english”
Table Student {
name StringType
class IntegerType
courseId IntegerType
}
Table Courses {
courseId IntegerType,
instructLang StringType,
course_desc StringType
}
Proj (name, class, course_desc)
Join(student.courseId = courses.courseId)
instructLang = “english”
Table courses
Table Student
Runtime Filter courseID Range-In (3, 7, 11 , 18, 25)
| courseID | |
Path | Lower Bound | Upper Bound |
/path1 | 1 | 3 |
/path2 | 4 | 6 |
/path3 | 13 | 17 |
/path4 | 6 | 12 |
Manifest file: Student
NavigableSet range = (3, 7, 11, 18, 25)
T leastEleGELowerBnd = range.ceiling(lowerBnd);
if (leastElementGELowerBnd == null) {
return DISCARD;
} else if (leastEleGELowerBnd.compare( lowerBnd)) == 0) {
return KEEP;
} else if (leastEleGELowerBnd.compare( upperBnd)) > 0) {
return DISCARD;
} else {
return KEEP;
}
10
© 2022 Cloudera, Inc. All rights reserved.
Remaining Work and Other Possible Future Enhancements
For example, if the RANGE-IN values are { (a = 1, b = 1), (a = 2, b = 2) } and partitioning structures are
a = 1 / b = 1
a = 1 / b = 2
a = 2 / b = 1
a = 2 / b = 2
Ideal pruning should result in only 2 partitions being read.
But because we have two separate filters a RANGE-IN (1, 2) and b RANGE-IN (1, 2), it will result in all the 4 partitions being read.
11
© 2022 Cloudera, Inc. All rights reserved.
Other Possible Future Enhancements - Continued
12
© 2022 Cloudera, Inc. All rights reserved.
13
© 2022 Cloudera, Inc. All rights reserved.
THANK YOU