Unexpectedly High RAM Usage with Window Function and Union Operations
Describe the bug
When executing a SQL query with a LAG window function over a partition, Daft consumes a significantly larger amount of RAM than expected relative to the input data size. The memory usage appears to scale dramatically when the input DataFrame is enlarged using union_all_by_name, leading to out-of-memory errors.
To Reproduce
- Data Load a Parquet file (~600 MB) into a Daft DataFrame. The schema contains columns id (string), model (string), and event_ts (timestamp).
-
Input Data Schema:
-
Number of Rows:
-
Scenario 1: Window Function on the Original DataFrame
Execute a SQL query with a LAG window function on the initial DataFrame. Both
count_rows()andwrite_parquet()trigger the high memory usage.
import daft
df = daft.read_parquet("...")
result_df = daft.sql("""
SELECT
id,
LAG(model, 1, 0) OVER(PARTITION BY id ORDER BY event_ts) AS model_lag
FROM df
""")
# Both count and write consume a large amount of RAM.
result_df.count_rows()
result_df.write_parquet("...")
-
Observed Behavior: The process consumes approximately 66 GB of RAM.
- Scenario 2: Window Function after Union Operations Create a larger DataFrame (4x the original size) by unioning the data with itself three times, each with a different timestamp offset. Then, run the same SQL query.
df_3d = (
df
.select(
"id",
(daft.col("event_ts") + daft.lit(int(timedelta(days=3).total_seconds() * 1000)).cast(daft.DataType.duration("ms"))).alias("event_ts")
)
)
df_10d = (
df
.select(
"id",
(daft.col("event_ts") + daft.lit(int(timedelta(days=10).total_seconds() * 1000)).cast(daft.DataType.duration("ms"))).alias("event_ts")
)
)
df_30d = (
df
.select(
"id",
(daft.col("event_ts") + daft.lit(int(timedelta(days=30).total_seconds() * 1000)).cast(daft.DataType.duration("ms"))).alias("event_ts")
)
)
new_df = (
df
.union_all_by_name(df_3d)
.union_all_by_name(df_10d)
.union_all_by_name(df_30d)
)
-
Observed Behavior: The process consumes all available RAM (over 112 GB) and fails.
Expected behavior
Memory consumption should be more proportional to the size of the data being partitioned and processed. For a ~600MB input file, RAM usage should be significantly lower than 66 GB.
The operation on the 4x larger DataFrame (~2.4 GB) should ideally complete without exhausting 112 GB of system memory.
Component(s)
Parquet, SQL, Native Runner
Additional context
No response
I encountered the same problem when executing SQL queries about the window function and union operation, and the same result was obtained when converting SQL to Expression. Is there any progress on the current bug?
Hey @jzz0930, unfortunately this issue is on our backlog, and I'm not able to give an estimate on when we'll be able to tackle it.
However, if you are curious, I can point you to where in the code i believe the issue is https://github.com/Eventual-Inc/Daft/blob/main/src/daft-local-execution/src/sinks/window_partition_and_order_by.rs.
The way the window function works is by materializing all the data, partitioning it, sorting each partition, and then performing the window function. Even though the input data is small, it requires a lot of intermediate allocations to perform these operations. To reduce the peak memory usage we can probably reuse some buffers, or perform this operations in streaming fashion.