Daft
Daft copied to clipboard
Window function support
Is your feature request related to a problem? Please describe.
Windows functions: functions that are applied over windows of data. Here is a great illustration from DuckDB:
Valid expressions that can be run over windows are:
- Aggregation expressions (e.g. sum(), mean())
- Window expressions (e.g. first(), last())
In this proposal, the Window API is similar to the PySpark API.
- We will create a new group of Expressions called
WindowExpressions(similar to how we have "aggregation expressions") - Unlike aggregation expressions however,
WindowExpressionsdo not change the cardinality of the dataframe/partitions WindowExpressionsalso have attached context of theWindowSpecthat they should run overWindowSpecconsists of: a. (Non-optional, for now)partition_by: a column to partition the overall dataframe by - windows do not cross partition boundaries b. (Optional)order_by: an expression to order each partition by c. (Optional)row_betweenORrange_between: the granularity of each frame within each partition. Defaults to(START_PARTITION, END_PARTITION).row_betweendefines each frame by how many rows before/after the current row to consider as part of the frame (+1 row, -1 row).range_betweendefines a range of values that defines each frame (e.g.(-1 minute, +1 minute)).- Aggregation Expressions can also be run over a
WindowSpec. In this case, they would not change the cardinality of the data, and will automatically be "broadcasted" over all data in the window.
As an initial implementation, Daft will not support:
WindowSpecwithout apartition_by("global window specs")WindowExpressionswithout aWindowSpec("window expressions over a global window spec")
Proposed API:
To mirror the duckdb simple examples:
-- generate a "row_number" column with containing incremental identifiers for each row
SELECT row_number() OVER () FROM sales;
-- generate a "row_number" column, by order of time
SELECT row_number() OVER (ORDER BY time) FROM sales;
-- generate a "row_number" column, by order of time partitioned by region
SELECT row_number() OVER (PARTITION BY region ORDER BY time) FROM sales;
-- compute the difference between the current amount, and the previous amount,
-- by order of time
SELECT amount - lag(amount) OVER (ORDER BY time) FROM sales;
-- compute the percentage of the total amount of sales per region for each row
SELECT amount / sum(amount) OVER (PARTITION BY region) FROM sales;
from daft.exprs import Function as F, Window
# generate a "row_number" column with containing incremental identifiers for each row
df.with_column("row_number", F.row_number()) # ERROR: Daft will not support this global WindowSpec
# generate a "row_number" column, by order of time
df.with_column("row_number", F.row_number().over(Window(sort_by=["time"]))) # ERROR: Daft will not support this global WindowSpec
# generate a "row_number" column, by order of time partitioned by region
df.with_column("row_number", F.row_number().over(Window(partition_by=["region"], sort_by=["time"])))
# compute the difference between the current amount, and the previous amount, by order of time
df.with_column("prev_amount", col("amount").lag(1)) \
.with_column("amount_diff", col("amount") - col("prev_amount"))
# compute the percentage of the total amount of sales per region for each row
df \
.with_column("region_sum", df["amount"].sum().over(Window(partition_by=["region"]))) \ # ERROR: This is a global window spec
.with_column("pct_sales_per_region", col("amount") / col("region_sum"))
Here are the examples with frames:
DuckDB:
-- Row-based ranges
SELECT points,
sum(points) OVER (
ROWS BETWEEN 1 PRECEDING
AND 1 FOLLOWING) we
FROM results;
-- Value-based ranges
SELECT "Plant", "Date",
avg("MWh") OVER (
PARTITION BY "Plant"
ORDER BY "Date" ASC
RANGE BETWEEN INTERVAL 3 DAYS PRECEDING
AND INTERVAL 3 DAYS FOLLOWING)
AS "MWh 7-day Moving Average"
FROM "Generation History"
ORDER BY 1, 2;
# ERROR: Daft won't support global window specs
df.select("points", col("points").sum().over(Window(rows_between=(-1, 1))))
# ERROR: Daft won't support global window specs
df.select("points", col("points").sum().over(Window(range_between=(datetime.timedelta(days=-3), datetime.timedelta(days=3))))))
See Also:
- DuckDB Windowing: https://duckdb.org/docs/sql/window_functions.html#aggregate-window-functions
- PySpark Windowing: https://spark.apache.org/docs/latest/api/python/reference/pyspark.sql/api/pyspark.sql.Window.html
This kind of operations are essential.
polars rolling functions are a good reference too: https://docs.pola.rs/api/python/stable/search.html?q=rolling