MULTIPLE ORDER SENSITIVE AGGREGATOR SUPPORT
MOTIVATION
The aim of this document is to propose alternative designs for the support of the multiple order sensitive aggregators with their pros and cons. With the feedback from community members, we want to reach a road map for possible implementations.
DataFusion Ticket: https://github.com/apache/arrow-datafusion/issues/8582
Date: 2023-12-21
BACKGROUND
Traditional SQL aggregates such as SUM or COUNT do not depend on the order of their inputs. However, some aggregate functions such as ARRAY_AGG produce different results depending on the order of their inputs, and thus some SQL dialects, such as PostgresSQL[1], support specifying the order of the input to aggregate functions using an ORDER BY clause.
Datafusion currently has 3 aggregate operators that (optionally) support specify an input ordering of their arguments. These are ARRAY_AGG, FIRST_VALUE, LAST_VALUE. Over time, we wish to expand the list of aggregators that can take ordered input to NTH_VALUE, RANK, UDAFs etc.
CURRENT SUPPORT
In the existing state of the Datafusion (2023.12.19), we can support compatible order-sensitive aggregators. Compatible means their required input order can be satisfied by sorting the data in a single order prior to computing the aggregate.
For example, assume table s has following schema
CREATE TABLE s (zip_code INT,
country VARCHAR(3),
sn INT,
ts TIMESTAMP,
currency VARCHAR(3),
amount FLOAT)
Consider following query
SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
FROM s
GROUP BY country
For the query above, we generate the following plan, which uses the SortExec to sort the input stream by amount ASC NULLS LAST prior to the AggregateExec.
ProjectionExec: expr=[country, ARRAY_AGG(s.amount) ORDER BY [s.amount ASC] as amounts]
--AggregateExec: mode=Single, gby=[country], aggr=[ARRAY_AGG(s.amount)]
----SortExec: expr=[amount@1 ASC NULLS LAST]
------MemoryExec: partitions=1, partition_sizes=[1]
This design ensures the aggregators (s.g. ARRAY_AGG) receive a stream of RecordBatches with values ordered per by the argument to the aggregator
For example, ARRAY_AGG may receive the following batches
amount
=======
1
2
5
6
amount
=======
7
8
10
12
With this design, the aggregator implementations are relatively straightforward. ARRAY_AGG can just concatenate data it receives when a new RecordBatch arrives. Similarly, FIRST_VALUE can store the first row it encounters. Then ignore any subsequent RecordBatches it receives.
With this design we can support multiple order sensitive aggregators, if their ordering requirement is compatible. Consider following query:
SELECT
country,
ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
FIRST_VALUE(amount ORDER BY amount, ts ASC) AS first_amount
FROM s
GROUP BY country
Since requirement [amount ASC, ts ASC] is a finer version of the requirement [amount ASC], the query above can be executed with the existing design by sorting the input to AggregateExec by amount ASC, ts ASC using the following plan.
ProjectionExec: expr=[country, ARRAY_AGG(s.amount) ORDER BY [s.amount ASC] as amounts, FIRST_VALUE(s.amount) ORDER BY [s.amount ASC, s.ts ASC] as first_amount]
--AggregateExec: mode=Single, gby=[country], aggr=[ARRAY_AGG(s.amount), FIRST_VALUE(s.amount)]
----SortExec: expr=[amount ASC, ts ASC]
------MemoryExec: partitions=1, partition_sizes=[1]
This plan exploits the property that ordering [amount ASC, ts ASC] also satisfies [amount ASC]. Hence all of the requirements of the aggregators are satisfied before data fed to them.
PROBLEM STATEMENT
However, the current design cannot support multiple aggregates with ordered inputs that have incompatible requirements, such as the following query.
SELECT
country,
ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
FIRST_VALUE(amount ORDER BY ts ASC) AS first_amount
FROM s
GROUP BY country
Since there is no ordering that satisfies both [ts ASC] and [amount ASC] we cannot resolve requirements of all aggregators with a single sort.
ALTERNATIVE DESIGNS
In the following section there are 3 designs proposed for the support of this feature.
DESIGN 1: SORT AGGREGATOR INPUTS IN AGGREGATEEXEC
This is the design implemented in PR8558. The idea is to extend the AggregateExec operator to first sort the input batches to ordered aggregate expressions according to their requirements.
For the query above we will end up with 2 groups of aggregates.
1st group: ARRAY_AGG(amount ORDER BY amount ASC) AS amounts
2nd group: FIRST_VALUE(amount ts ASC) AS first_amount
Before feeding data to the aggregators in a group, the batch is first sorted according to the group requirement (If there is no requirement for the group). For example, if the batches received by AggregateExec are
amount ts
======= =======
5 3
4 2
2 5
1 1
amount ts
======= =======
7 3
4 4
2 5
3 8
1st group sorts the original batches according to requirement [amount ASC]
amount ts
======= =======
1 1
2 5
4 2
5 3
amount ts
======= =======
2 5
3 8
4 4
7 3
Similarly, the 2nd group sorts the original batches according to requirement [ts ASC].
amount ts
======= =======
1 1
4 2
5 3
2 5
amount ts
======= =======
7 3
4 4
2 5
3 8
Note that with this approach, the aggregator implementations become more complex. They can no longer assume the stream of RecordBatches they receive satisfy their requirement across different batches, only that each RecordBatchitself satisfies the requirement of the aggregator.
With this assumption ARRAY_AGG must merge its state and RecordBatch it receives to update its state (Concatenation is no longer valid for this assumption). Similarly, FIRST_VALUE aggregator should compare first_value in its state and first value of the RecordBatch it receives to find the correct first value across batches.
DESIGN 2: USE COMMON TABLE EXPRESSIONS
Consider following query:
SELECT
country,
ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
FIRST_VALUE(amount ORDER BY ts ASC) AS first_amount
FROM sales_global
GROUP BY country
We can rewrite the query above to calculate different order sensitive aggregators in two different corresponding subqueries, and then join these subqueries according to the group by clause equality. Re-written subquery can be found below.
SELECT * FROM
(SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts1
FROM sales_global
GROUP BY country) as t1,
(SELECT country, ARRAY_AGG(amount ORDER BY ts ASC) AS amounts2
FROM sales_global
GROUP BY country) as t2
WHERE t1.country=t2.country
For the query above we generate following physical plan:
CoalesceBatchesExec: target_batch_size=2
--SortMergeJoinExec: join_type=Inner, on=[(country@0, country@0)]
----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts1]
------AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)]
--------SortExec: expr=[amount@1 ASC NULLS LAST]
----------MemoryExec: partitions=1, partition_sizes=[1]
----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as amounts2]
------AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)]
--------SortExec: expr=[ts@1 ASC NULLS LAST]
----------MemoryExec: partitions=1, partition_sizes=[1]
As can be seen from the plan above, requirements of different aggregate groups are satisfied by the corresponding SortExec below them in subqueries. With a rule we can re-write original query to the version with subquery. With this approach aggregators can still assume that RecordBatches they receive have global ordering across batches. Similar to the existing implementation.
However, the physical plan above has substantial duplication of work as each subquery independently runs the MemoryExec, which could be an entire subquery itself in a real query. The duplication can be avoided using a physical plan such as the following to reduce duplicated computation at the input of the aggregate
SortMergeJoinExec: join_type=Inner, on=[(country@0, country@0)]
↑ ↑
AggregateExec(gby=country, aggr=[ARRAY_AGG]) AggregateExec(gby=country, aggr=[ARRAY_AGG])
↑ ↑
SortExec(ts ASC) SortExec(amount ASC)
↑ ↑
Splitter (Generates multiple parents)
↑
MemoryExec
With the plan above we can reduce redundant computation.
DESIGN 3: MOVE SORTING ENTIRELY INSIDE EACH AGGREGATOR IMPLEMENTATION
In this design, we would avoid sorting the input to the AggregateExec and instead rely on the Aggregator implementations themselves to handle any sorting and comparing internally. For example, given the query:
SELECT
country,
ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,
FIRST_VALUE(amount ORDER BY ts ASC) AS first_amount
FROM sales_global
GROUP BY country
The plan would look like the following, (no SortExec):
ProjectionExec: expr=[...]
--AggregateExec: mode=Single, gby=[country], aggr=[ARRAY_AGG(s.amount ORDER BY amount), FIRST_VALUE(s.amount ORDER BY ts)]
----MemoryExec: partitions=1, partition_sizes=[1]
ARRAY_AGG aggregator would receive the amount column unsorted
amount
=======
5
4
2
1
amount
=======
7
4
2
3
Internally the ARRAY_AGG aggregator would store and sort the values before producing the final output. This is similar to existing logic[2] required to merge partial results.
Similarly, the FIRST_VALUE aggregator would receive both the amount and ts columns as arguments and would track the current lowest ts value as well as the corresponding amount[3]. This is similar to existing logic[4] to merge partial results.
amount ts
======= =======
5 3
4 2
2 5
1 1
amount ts
======= =======
7 3
4 4
2 5
3 8
DESIGN COMPARISON
As in most engineering decisions, there are tradeoffs between the different design across several different dimensions, as outlined below
Complexity of Aggregator implementation compared to main
Complexity of HashAggregateExec Implementation compared to main
Complexity of other operators / areas of the code compared to main
Clarity of EXPLAIN PLAN compared to main
Performance: How much sorting is required?
Potential for optimizations within each Aggregator
Additional Cost of Implementation
[3] Perhaps interestingly, we actually have a special user defined aggregate in IOx that does exactly this operation today.