Daft icon indicating copy to clipboard operation
Daft copied to clipboard

Unexpectedly High RAM Usage with Window Function and Union Operations

Open gabrielteotonio opened this issue 7 months ago • 2 comments

Describe the bug

When executing a SQL query with a LAG window function over a partition, Daft consumes a significantly larger amount of RAM than expected relative to the input data size. The memory usage appears to scale dramatically when the input DataFrame is enlarged using union_all_by_name, leading to out-of-memory errors.

To Reproduce

  1. Data Load a Parquet file (~600 MB) into a Daft DataFrame. The schema contains columns id (string), model (string), and event_ts (timestamp).
  • Input Data Schema: Image

  • Number of Rows: Image

  1. Scenario 1: Window Function on the Original DataFrame Execute a SQL query with a LAG window function on the initial DataFrame. Both count_rows() and write_parquet() trigger the high memory usage.
import daft

df = daft.read_parquet("...")

result_df = daft.sql("""
    SELECT
        id,
        LAG(model, 1, 0) OVER(PARTITION BY id ORDER BY event_ts) AS model_lag
    FROM df
""")

# Both count and write consume a large amount of RAM.
result_df.count_rows()
result_df.write_parquet("...")
  • Observed Behavior: The process consumes approximately 66 GB of RAM. Image
  1. Scenario 2: Window Function after Union Operations Create a larger DataFrame (4x the original size) by unioning the data with itself three times, each with a different timestamp offset. Then, run the same SQL query.
df_3d = (
    df
    .select(
        "id",
        (daft.col("event_ts") + daft.lit(int(timedelta(days=3).total_seconds() * 1000)).cast(daft.DataType.duration("ms"))).alias("event_ts")
    )
)
df_10d = (
    df
    .select(
        "id",
        (daft.col("event_ts") + daft.lit(int(timedelta(days=10).total_seconds() * 1000)).cast(daft.DataType.duration("ms"))).alias("event_ts")
    )
)
df_30d = (
    df
    .select(
        "id",
        (daft.col("event_ts") + daft.lit(int(timedelta(days=30).total_seconds() * 1000)).cast(daft.DataType.duration("ms"))).alias("event_ts")
    )
)

new_df = (
    df
    .union_all_by_name(df_3d)
    .union_all_by_name(df_10d)
    .union_all_by_name(df_30d)
)
  • Observed Behavior: The process consumes all available RAM (over 112 GB) and fails. Image

Expected behavior

Memory consumption should be more proportional to the size of the data being partitioned and processed. For a ~600MB input file, RAM usage should be significantly lower than 66 GB.

The operation on the 4x larger DataFrame (~2.4 GB) should ideally complete without exhausting 112 GB of system memory.

Component(s)

Parquet, SQL, Native Runner

Additional context

No response

gabrielteotonio avatar Oct 01 '25 22:10 gabrielteotonio

I encountered the same problem when executing SQL queries about the window function and union operation, and the same result was obtained when converting SQL to Expression. Is there any progress on the current bug?

jzz0930 avatar Nov 06 '25 12:11 jzz0930

Hey @jzz0930, unfortunately this issue is on our backlog, and I'm not able to give an estimate on when we'll be able to tackle it.

However, if you are curious, I can point you to where in the code i believe the issue is https://github.com/Eventual-Inc/Daft/blob/main/src/daft-local-execution/src/sinks/window_partition_and_order_by.rs.

The way the window function works is by materializing all the data, partitioning it, sorting each partition, and then performing the window function. Even though the input data is small, it requires a lot of intermediate allocations to perform these operations. To reduce the peak memory usage we can probably reuse some buffers, or perform this operations in streaming fashion.

colin-ho avatar Nov 09 '25 06:11 colin-ho