Published using Google Docs
MULTIPLE ORDER SENSITIVE AGGREGATOR SUPPORT
Updated automatically every 5 minutes

MULTIPLE ORDER SENSITIVE AGGREGATOR SUPPORT

MOTIVATION

The aim of this document is to propose alternative designs for the support of the multiple order sensitive aggregators with their pros and cons. With the feedback from community members, we want to reach a road map for possible implementations.

DataFusion Ticket: https://github.com/apache/arrow-datafusion/issues/8582 

Date: 2023-12-21

BACKGROUND

Traditional SQL aggregates such as SUM or COUNT do not depend on the order of their inputs. However, some aggregate functions such as ARRAY_AGG produce different results depending on the order of their inputs, and thus some SQL dialects, such as PostgresSQL[1], support specifying the order of the input to aggregate functions using an ORDER BY clause.

Datafusion currently has 3 aggregate operators that (optionally) support specify an input ordering of their arguments. These are ARRAY_AGG, FIRST_VALUE, LAST_VALUE. Over time, we wish to expand the list of aggregators that can take ordered input to NTH_VALUE, RANK, UDAFs etc.

CURRENT SUPPORT

In the existing state of the Datafusion (2023.12.19), we can support compatible order-sensitive aggregators. Compatible means their required input order can be satisfied by sorting the data in a single order prior to computing the aggregate.

For example, assume table s has following schema

CREATE TABLE s (zip_code INT,

          country VARCHAR(3),

          sn INT,

          ts TIMESTAMP,

          currency VARCHAR(3),

          amount FLOAT)

Consider following query

SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts

FROM s

GROUP BY country

For the query above, we generate the following plan, which uses the SortExec to sort the input stream by amount ASC NULLS LAST prior to the AggregateExec.

ProjectionExec: expr=[country, ARRAY_AGG(s.amount) ORDER BY [s.amount ASC] as amounts]

--AggregateExec: mode=Single, gby=[country], aggr=[ARRAY_AGG(s.amount)]

----SortExec: expr=[amount@1 ASC NULLS LAST]

------MemoryExec: partitions=1, partition_sizes=[1]

This design ensures the aggregators (s.g. ARRAY_AGG) receive a stream of RecordBatches with values ordered per by the argument to the aggregator

For example, ARRAY_AGG may receive the following batches

amount
=======
1

2

5

6

amount
=======
7

8

10

12

With this design, the aggregator implementations are relatively straightforward. ARRAY_AGG can just concatenate data it receives when a new RecordBatch arrives. Similarly, FIRST_VALUE can store the first row it encounters. Then ignore any subsequent RecordBatches it receives.

With this design we can support multiple order sensitive aggregators, if their ordering requirement is compatible. Consider following query:

SELECT

  country,

  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,

  FIRST_VALUE(amount ORDER BY amount, ts ASC) AS first_amount

FROM s

GROUP BY country

Since requirement [amount ASC, ts ASC] is a finer version of the requirement [amount ASC], the query above can be executed with the existing design by sorting the input to AggregateExec by amount ASC, ts ASC using the following plan.

ProjectionExec: expr=[country, ARRAY_AGG(s.amount) ORDER BY [s.amount ASC] as amounts, FIRST_VALUE(s.amount) ORDER BY [s.amount ASC, s.ts ASC] as first_amount]

--AggregateExec: mode=Single, gby=[country], aggr=[ARRAY_AGG(s.amount), FIRST_VALUE(s.amount)]

----SortExec: expr=[amount ASC, ts ASC]

------MemoryExec: partitions=1, partition_sizes=[1]

This plan exploits the property that ordering [amount ASC, ts ASC] also satisfies [amount ASC]. Hence all of the requirements of the aggregators are satisfied before data fed to them.

PROBLEM STATEMENT

However, the current design cannot support multiple aggregates with ordered inputs that have incompatible requirements, such as the following query.

SELECT

  country,

  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,

  FIRST_VALUE(amount ORDER BY ts ASC) AS first_amount

FROM s

GROUP BY country

Since there is no ordering that satisfies both [ts ASC] and [amount ASC] we cannot resolve requirements of all aggregators with a single sort.

ALTERNATIVE DESIGNS

In the following section there are 3 designs proposed for the support of this feature.

DESIGN 1: SORT AGGREGATOR INPUTS IN AGGREGATEEXEC

This is the design implemented in PR8558. The idea is to extend the AggregateExec operator to first sort the input batches to ordered aggregate expressions according to their requirements.

For the query above we will end up with 2 groups of aggregates.

1st group: ARRAY_AGG(amount ORDER BY amount ASC) AS amounts

2nd group: FIRST_VALUE(amount ts ASC) AS first_amount

Before feeding data to the aggregators in a group, the batch is first sorted according to the group requirement (If there is no requirement for the group). For example, if the batches received by AggregateExec are

amount   ts
=======  =======

5        3

4        2

2        5

1        1

amount   ts
=======  =======

7        3

4        4

2        5

3        8

1st group sorts the original batches according to requirement [amount ASC]

amount   ts
=======  =======

1        1

2        5

4        2

5        3

amount   ts
=======  =======

2        5

3        8

4        4

7        3

Similarly, the 2nd group sorts the original batches according to requirement [ts ASC].

amount   ts
=======  =======

1        1

4        2

5        3

2        5

amount   ts
=======  =======

7        3

4        4

2        5

3        8

Note that with this approach, the aggregator implementations become more complex. They can no longer assume the stream of RecordBatches they receive satisfy their requirement across different batches, only that each RecordBatchitself satisfies the requirement of the aggregator.

With this assumption ARRAY_AGG must merge its state and RecordBatch it receives to update its state (Concatenation is no longer valid for this assumption). Similarly, FIRST_VALUE aggregator should compare first_value in its state and first value of the RecordBatch it receives to find the correct first value across batches.

DESIGN 2: USE COMMON TABLE EXPRESSIONS

Consider following query:

SELECT

  country,

  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,

  FIRST_VALUE(amount ORDER BY ts ASC) AS first_amount

FROM sales_global

GROUP BY country

We can rewrite the query above to calculate different order sensitive aggregators in two different corresponding subqueries, and then join these subqueries according to the group by clause equality. Re-written subquery can be found below.

 SELECT * FROM

  (SELECT country, ARRAY_AGG(amount ORDER BY amount ASC) AS amounts1

    FROM sales_global

    GROUP BY country) as t1,

  (SELECT country, ARRAY_AGG(amount ORDER BY ts ASC) AS amounts2

   FROM sales_global

   GROUP BY country) as t2

  WHERE t1.country=t2.country

For the query above we generate following physical plan:

CoalesceBatchesExec: target_batch_size=2

--SortMergeJoinExec: join_type=Inner, on=[(country@0, country@0)]

----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.amount ASC NULLS LAST]@1 as amounts1]

------AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)]

--------SortExec: expr=[amount@1 ASC NULLS LAST]

----------MemoryExec: partitions=1, partition_sizes=[1]

----ProjectionExec: expr=[country@0 as country, ARRAY_AGG(sales_global.amount) ORDER BY [sales_global.ts ASC NULLS LAST]@1 as amounts2]

------AggregateExec: mode=Single, gby=[country@0 as country], aggr=[ARRAY_AGG(sales_global.amount)]

--------SortExec: expr=[ts@1 ASC NULLS LAST]

----------MemoryExec: partitions=1, partition_sizes=[1]

As can be seen from the plan above, requirements of different aggregate groups are satisfied by the corresponding SortExec below them in subqueries. With a rule we can re-write original query to the version with subquery. With this approach aggregators can still assume that RecordBatches they receive have global ordering across batches. Similar to the existing implementation.

However, the physical plan above has substantial duplication of work as each subquery independently runs the MemoryExec, which could be an entire subquery itself in a real query. The duplication can be avoided using a physical plan such as the following to reduce duplicated computation at the input of the aggregate

             SortMergeJoinExec: join_type=Inner, on=[(country@0, country@0)]

                                ↑                       ↑

AggregateExec(gby=country, aggr=[ARRAY_AGG])      AggregateExec(gby=country, aggr=[ARRAY_AGG])

                       ↑                                      ↑

               SortExec(ts ASC)                        SortExec(amount ASC)

                                                         

                         Splitter (Generates multiple parents)

                                         

                                     MemoryExec

With the plan above we can reduce redundant computation.

DESIGN 3: MOVE SORTING ENTIRELY INSIDE EACH AGGREGATOR IMPLEMENTATION

In this design, we would avoid sorting the input to the AggregateExec and instead rely on the Aggregator implementations themselves to handle any sorting and comparing internally. For example, given the query:

SELECT

  country,

  ARRAY_AGG(amount ORDER BY amount ASC) AS amounts,

  FIRST_VALUE(amount ORDER BY ts ASC) AS first_amount

FROM sales_global

GROUP BY country

The plan would look like the following,  (no SortExec):

ProjectionExec: expr=[...]

--AggregateExec: mode=Single, gby=[country], aggr=[ARRAY_AGG(s.amount ORDER BY amount), FIRST_VALUE(s.amount ORDER BY ts)]

----MemoryExec: partitions=1, partition_sizes=[1]

 ARRAY_AGG aggregator would receive the amount column unsorted

amount  
=======

5        

4        

2        

1        

amount  
=======  

7      

4        

2        

3        

Internally the ARRAY_AGG aggregator would store and sort the values before producing the final output. This is similar to existing logic[2] required to merge partial results.

Similarly, the FIRST_VALUE aggregator would receive both the amount and ts columns as arguments and would track the current lowest ts value as well as the corresponding amount[3]. This is similar to existing logic[4] to merge partial results.

amount   ts
=======  =======

5        3

4        2

2        5

1        1

amount   ts
=======  =======

7        3

4        4

2        5

3        8

DESIGN COMPARISON

As in most engineering decisions, there are tradeoffs between the different design across several different dimensions, as outlined below

Complexity of Aggregator implementation compared to main

 Complexity of HashAggregateExec Implementation compared to main

Complexity of other operators / areas of the code compared to main

Clarity of EXPLAIN PLAN compared to main

Performance: How much sorting is required?

Potential for optimizations within each Aggregator

Additional Cost of Implementation


[1] https://www.postgresql.org/docs/current/sql-expressions.html#SYNTAX-AGGREGATES 

[2] Source Code to array_agg_ordered.rs 

[3] Perhaps interestingly, we actually have a special user defined aggregate in IOx that does exactly this operation today.

[4] Source Code of first_last.rs