1 of 14

Spark

2 of 14

Performance Enhancement of Broadcast Hash Join Queries in Spark - Iceberg

3 of 14

Existing Spark Feature

  • Dynamic Partition Pruning ( DPP )
    • Improves performance of Join Queries

    • Example Query where DPP helps

SELECT population, census_data.state_name FROM census_data JOIN state_economy ON census_data.state_name = state_economy.state_name WHERE gdp > 50000000

Table census_data {

population Long Type,

state_name String Type

}

partitioned by state_name

Table state_economy {

gdp Long Type,

state_name String Type

}

3

© 2022 Cloudera, Inc. All rights reserved.

4 of 14

How DPP Works

Project (population, state_name)

Join (census_data.state_name = state_economy.state_name)

Table census_data

Filter

gdp >50000000

Table

state_economy

Data Source V2 ( Iceberg)

Partition column (state_name)

/az /file1

/or /file2

/ca /file3

/co /file4

4

© 2022 Cloudera, Inc. All rights reserved.

5 of 14

How DPP Works - Continued

  • Given the presence of selective filter gdp > 50000000, Spark will fire a DPP Query before executing the join

DPP Query would be somewhat like

select distinct state_name from state_economy where gdp > 50000000

  • Let’s say the distinct values of state_name are [‘OR’, ‘CA’]

  • Since census_data.state_name = state_economy.state_name, a runtime filter of “IN” type can be pushed down to Table census_data, at the Data Source Level i.e state_name IN (‘OR’, ‘CA’)

  • Due to the runtime filter state_name IN (‘OR’, ‘CA’), iceberg ( DataSource V2) can skip reading partitions ‘AZ’, ‘CO’ ….

NOTE:If the join in spark is Broadcast Hash Join with state_economy table being broadcasted (after applying the filter), the broadcasted variable’s data is the DPP query result. In which case, Spark will not fire a DPP query

5

© 2022 Cloudera, Inc. All rights reserved.

6 of 14

How DPP Works - Continued

Project (population, state_name)

Join (census_data.state_name = state_economy.state_name)

Table census_data

Filter

gdp >50000000

Table

state_economy

Data Source V2 ( Iceberg)

Partition column (state_name)

Runtime Filter state_name IN (‘OR’, ‘CA’)

/az /file1

/or /file2

/ca /file3

/co /file4

6

© 2022 Cloudera, Inc. All rights reserved.

7 of 14

DPP : Summary

    • Improves performance of join queries when one of the join column is a partitioning column and a non trivial filter is available on the other table

    • It works by reducing the number of partitions to scan for building the join

    • If DPP query is not selective enough, no perf benefit will be seen (as it will not reduce the number of partitions to scan)

7

© 2022 Cloudera, Inc. All rights reserved.

8 of 14

Can DPP concept be taken further?

  • Can the concept be used for Joins involving non partitioning column, without incurring cost of DPP query ? (Broadcast Hash Joins eliminate the need of separate DPP query)

  • In case of Iceberg, pushed runtime IN filter remains only at driver. So for partitioning strategy with transforms, the full benefit of IN filter is not achieved, which requires IN filter to be sent on the executors.

  • Can the availability of Min/Max stats of a column, present in Manifest files of Iceberg and in Parquet RowGroup and Chunk level data structures, be leveraged?

  • For non partition column joins, the IN filter should also be passed to executors to do pruning at RowGroup and Chunk level, while minimizing the cost of serialization of IN values.

8

© 2022 Cloudera, Inc. All rights reserved.

9 of 14

Broadcast Hash Joins of Spark tick all boxes

  • Build side keys are already available on driver and executors, eliminating the need of DPP query

  • Instead of IN filter, create a new filter RANGE-IN, where instead of Set, use SortedSet of Keys

  • Using Caching, Weak References etc, ensure a single SortedSet per VM, build from Broadcast Var

  • The RANGE-IN passed with serialized iterator instance does not have to serialize keys.

  • With Min/Max stats of Column and the SortedSet, prune the Manifest file, Row Group , Chunk etc.

For example: assume RANGE-IN with values (1, 5, 9, 15, 23) for column X

And (min, max) stats for column X in Manifest file for a data file is (10, 14)

So it is easy to see that, the data file can be excluded.

  • Requires column to be of Comparable type ( Integer, Long, String etc)

  • Requires changes in Spark and Iceberg ( or any DataSource V2 implementing SupportsRuntimeFiltering interface of spark )

9

© 2022 Cloudera, Inc. All rights reserved.

10 of 14

Example query

Select name, class, course_desc from students join courses on students.courseId = courses.courseId where courses.instructLang = “english”

Table Student {

name StringType

class IntegerType

courseId IntegerType

}

Table Courses {

courseId IntegerType,

instructLang StringType,

course_desc StringType

}

Proj (name, class, course_desc)

Join(student.courseId = courses.courseId)

instructLang = “english”

Table courses

Table Student

Runtime Filter courseID Range-In (3, 7, 11 , 18, 25)

courseID

Path

Lower Bound

Upper Bound

/path1

1

3

/path2

4

6

/path3

13

17

/path4

6

12

Manifest file: Student

NavigableSet range = (3, 7, 11, 18, 25)

T leastEleGELowerBnd = range.ceiling(lowerBnd);

if (leastElementGELowerBnd == null) {

return DISCARD;

} else if (leastEleGELowerBnd.compare( lowerBnd)) == 0) {

return KEEP;

} else if (leastEleGELowerBnd.compare( upperBnd)) > 0) {

return DISCARD;

} else {

return KEEP;

}

10

© 2022 Cloudera, Inc. All rights reserved.

11 of 14

Remaining Work and Other Possible Future Enhancements

  • The Range Set is not able to filter out an individual row as the data is emanating as a Column Batch from iceberg, which needs to be taken care of
  • In case of multi column join queries (a = x and b = y), spark pushes IN filter on only 1 of the condition.

  • Our work does push 2 RANGE-IN filters (one for each condition) , but since the pairing information is lost, the pruning is sub-optimal

For example, if the RANGE-IN values are { (a = 1, b = 1), (a = 2, b = 2) } and partitioning structures are

a = 1 / b = 1

a = 1 / b = 2

a = 2 / b = 1

a = 2 / b = 2

Ideal pruning should result in only 2 partitions being read.

But because we have two separate filters a RANGE-IN (1, 2) and b RANGE-IN (1, 2), it will result in all the 4 partitions being read.

11

© 2022 Cloudera, Inc. All rights reserved.

12 of 14

Other Possible Future Enhancements - Continued

  • Our colleague, Rajesh Balamohan, found that with AQE (Adaptive Query Execution), reuse of exchange subplans is broken (reuse does not happen)

    • The cause is runtime filters being pushed by the DPP, breaks the equality / hashcode of the BatchScanExec (which in turn depends on the underlying DataSource V2 Scan).

    • Ticket and proposed fix will be filed soon.

  • Extend the bloom filters being used in Sort Merge Join ( SMJ ) in Spark 3.3 to DataSource Level for pruning files/data being scanned

12

© 2022 Cloudera, Inc. All rights reserved.

13 of 14

13

© 2022 Cloudera, Inc. All rights reserved.

14 of 14

THANK YOU