datafusion icon indicating copy to clipboard operation
datafusion copied to clipboard

Add retract_batch method for median accumulator

Open mustafasrepo opened this issue 2 years ago • 2 comments

Is your feature request related to a problem or challenge?

Median accumulator implementation can be realized with a sliding algorithm. However, current implementation lacks this implementation.

Describe the solution you'd like

No response

Describe alternatives you've considered

No response

Additional context

No response

mustafasrepo avatar Sep 27 '23 09:09 mustafasrepo

DataDog is interested in this feature. Our focus is on small sliding windows of 3, 5, 7, and 9, which I believe are quite reasonable to support as they only require a small buffer to compute and return the median. Below are the steps to reproduce:

-- create a table with 3 columns: timestamp, tags, value
CREATE TABLE test_table (
    timestamp INT,
    tags VARCHAR(255),
    value DOUBLE
);

-- insert some data into the table
INSERT INTO test_table (timestamp, tags, value) VALUES
(1, 'tag1', 10),
(2, 'tag1', 20),
(3, 'tag1', 30),
(4, 'tag1', 40),
(5, 'tag1', 50),
(1, 'tag2', 60),
(2, 'tag2', 70),
(3, 'tag2', 80),
(4, 'tag2', 90),
(5, 'tag2', 100);

-- Not supported yet
SELECT
    timestamp,
    tags,
    median(value) OVER (PARTITION BY tags ORDER BY timestamp ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS value_median_3
FROM test_table
ORDER BY tags, timestamp;
-- This feature is not implemented: Aggregate can not be used as a sliding accumulator because `retract_batch` is not implemented: median(test_table.value) PARTITION BY [test_table.tags] ORDER BY [test_table.timestamp ASC NULLS LAST] ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING

-- Other streamable aggregations such as max are supported
SELECT
    timestamp,
    tags,
    value,
    max(value) OVER (PARTITION BY tags ORDER BY timestamp ROWS BETWEEN 1 PRECEDING AND 1 FOLLOWING) AS value_max_3
FROM test_table
ORDER BY tags, timestamp;   
+-----------+------+-------+-------------+
| timestamp | tags | value | value_max_3 |
+-----------+------+-------+-------------+
| 1         | tag1 | 10.0  | 20.0        |
| 2         | tag1 | 20.0  | 30.0        |
| 3         | tag1 | 30.0  | 40.0        |
| 4         | tag1 | 40.0  | 50.0        |
| 5         | tag1 | 50.0  | 50.0        |
| 1         | tag2 | 60.0  | 70.0        |
| 2         | tag2 | 70.0  | 80.0        |
| 3         | tag2 | 80.0  | 90.0        |
| 4         | tag2 | 90.0  | 100.0       |
| 5         | tag2 | 100.0 | 100.0       |
+-----------+------+-------+-------------+
10 row(s) fetched.

NGA-TRAN avatar Apr 21 '25 13:04 NGA-TRAN

Some pointers.

Where to do implementation:

https://github.com/apache/datafusion/blob/2a08013af3ccf703bee202c959b40bb0d35bdea1/datafusion/functions-aggregate/src/median.rs

See the methods to implement from trait:

https://github.com/apache/datafusion/blob/2a08013af3ccf703bee202c959b40bb0d35bdea1/datafusion/expr-common/src/accumulator.rs#L298-L313

Example retract_batch from other method:

https://github.com/apache/datafusion/blob/79a2f5e110f2a3ddbe6d943f951bd6f518549b03/datafusion/functions-aggregate/src/average.rs#L554-L565

Add tests here:

https://github.com/apache/datafusion/blob/79a2f5e110f2a3ddbe6d943f951bd6f518549b03/datafusion/sqllogictest/test_files/aggregate.slt

  • See suggested tests in above comment

Jefffrey avatar Dec 10 '25 13:12 Jefffrey