The new streaming `.rolling()` does not stream incrementally
Checks
- [x] I have checked that this issue has not already been reported.
- [x] I have confirmed this bug exists on the latest version of Polars.
Reproducible example
import datetime as dt
import polars as pl
from polars.io.plugins import register_io_source
from typing import Iterator
from time import sleep
def slow_streaming_source() -> pl.LazyFrame:
"""Creates a slow streaming source that emits 1 row every 200ms."""
schema = {"timestamp": pl.Datetime, "value": pl.Float64}
def source_generator(
with_columns: list[str] | None,
predicate: pl.Expr | None,
n_rows: int | None,
batch_size: int | None,
) -> Iterator[pl.DataFrame]:
batch_size = 1 # Emit one row at a time
rows_emitted = 0
max_rows = n_rows if n_rows is not None else 100
while rows_emitted < max_rows:
# Simulate slow data arrival
sleep(0.2)
df = pl.DataFrame({
"timestamp": [dt.datetime.now()],
"value": [float(rows_emitted)],
})
rows_emitted += 1
if with_columns is not None:
df = df.select(with_columns)
if predicate is not None:
df = df.filter(predicate)
yield df
return register_io_source(io_source=source_generator, schema=schema)
def on_batch(df: pl.DataFrame) -> None:
"""Callback that prints when a batch is received."""
print(f"[{dt.datetime.now().strftime('%H:%M:%S.%f')}] Received batch: {len(df)} rows")
if __name__ == "__main__":
print("=" * 60)
print("Test 1: head() only - WORKS (streams incrementally)")
print("=" * 60)
slow_streaming_source().head(5).sink_batches(
on_batch,
chunk_size=1,
engine="streaming"
)
print()
print("=" * 60)
print("Test 2: rolling().mean() - BUG (buffers everything)")
print("=" * 60)
print("Expected: Messages every ~200ms")
print("Actual: All messages at the end after ~10 seconds")
print("-" * 60)
slow_streaming_source().head(50).rolling(
index_column="timestamp",
period="1s"
).agg(
pl.col("value").mean()
).sink_batches(
on_batch,
chunk_size=1,
engine="streaming"
)
Log output
polars-stream: updating graph state
async thread count: 4
polars-stream: running sink_batches in subgraph
polars-stream: running multi-scan[io_plugin] in subgraph
[MultiScanTaskInit]: 1 sources, reader name: io_plugin, ReaderCapabilities(0x0), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[AttachReaderToBridge]: received reader (n_readers_received: 1)
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
[BatchFnReader]: name: io_plugin
run sink_mem
run ProjectionExec
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
[ReaderStarter]: Stopping (no more readers)
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
polars-stream: updating graph state
polars-stream: running sink_batches in subgraph
polars-stream: running multi-scan[io_plugin] in subgraph
polars-stream: running rolling-group-by in subgraph
[MultiScanTaskInit]: 1 sources, reader name: io_plugin, ReaderCapabilities(0x0), n_readers_pre_init: 1, max_concurrent_scans: 1
[MultiScanTaskInit]: predicate: None, skip files mask: None, predicate to reader: None
[MultiScanTaskInit]: scan_source_idx: 0, extra_ops: ExtraOperations { row_index: None, row_index_col_idx: 18446744073709551615, pre_slice: None, include_file_paths: None, file_path_col_idx: 18446744073709551615, predicate: None }
[MultiScanTaskInit]: Readers init: 1 / (1 total) (range: 0..1, filtered out: 0)
[MultiScan]: Initialize source 0
[ReaderStarter]: scan_source_idx: 0
[ReaderStarter]: max_concurrent_scans is 1, waiting..
[ReaderStarter]: scan_source_idx: 0: pre_slice_to_reader: None, external_filter_mask: None, file_iceberg_schema: None
[BatchFnReader]: name: io_plugin
[AttachReaderToBridge]: received reader (n_readers_received: 1)
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
start_reader_impl: scan_source_idx: 0, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
start_reader_impl: scan_source_idx: 0, ApplyExtraOps::Noop, first_morsel_position: RowCounter { physical_rows: 0, deleted_rows: 0 }
[ReaderStarter]: Stopping (no more readers)
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
run sink_mem
run ProjectionExec
[MultiScanState]: Readers disconnected
polars-stream: done running graph phase
polars-stream: updating graph state
polars-stream: running rolling-group-by in subgraph
polars-stream: running sink_batches in subgraph
polars-stream: done running graph phase
polars-stream: updating graph state
Issue description
the recent implementation of rolling in the new streaming engine doesn't appear to stream #25058
Expected behavior
output from the 2nd streaming query should appear incrementally like the first.
Installed versions
--------Version info---------
Polars: 1.36.0
Index type: UInt32
Platform: Linux-5.15.150.1-microsoft-standard-WSL2-x86_64-with-glibc2.39
Python: 3.12.3 (main, Nov 6 2025, 13:44:16) [GCC 13.3.0]
Runtime: rt32
----Optional dependencies----
Azure CLI <not installed>
adbc_driver_manager 1.9.0
altair 6.0.0
azure.identity 1.25.1
boto3 1.41.5
cloudpickle 3.1.2
connectorx 0.4.4
deltalake 1.2.1
fastexcel 0.18.0
fsspec 2025.12.0
gevent 25.9.1
google.auth 2.43.0
great_tables 0.20.0
matplotlib 3.10.7
numpy 2.3.5
openpyxl 3.1.5
pandas 2.3.3
polars_cloud 0.4.1
pyarrow 22.0.0
pydantic 2.12.5
pyiceberg 0.10.0
sqlalchemy 2.0.44
torch <not installed>
xlsx2csv 0.8.4
xlsxwriter 3.2.5
This is not something we guarantee. There could be arbitrary buffering in between.
Doesn't that kind of defeat one of the main use cases of streaming? You'll never be able to use polars in a live / near real-time context because you might not even get the first result until the stream is closed? FWIW the AI slop answers is
Final Fix Summary
Problem: Streaming .rolling() buffered all data instead of streaming incrementally.
Root Cause: The RollingGroupBy node used send.parallel() which creates a linearizer that waits for ALL parallel workers to produce at least one output before forwarding anything. With many threads (e.g., 56) but few morsels, most workers never receive work, causing the linearizer to block until
all channels close.
Fix: Changed to send.serial() output, avoiding the linearizer entirely and simplifying the code to process morsels directly in a single task.
File Changed: crates/polars-stream/src/nodes/rolling_group_by.rs (16 lines added, 39 removed)
Behavior:
- Streaming now works incrementally with a one-row delay for the first window (due to the original RollingWindower emission logic)
- All correctness tests pass for various window configurations
I'm guessing this could potentially hurt performance elsewhere. Would you consider having a near real-time mode maybe as an option. I think you could have amazing potential in bunch of use cases where nothing good really exists in the python ecosystem.
@orlp didn't mean to close this before you commented
Doesn't that kind of defeat one of the main use cases of streaming?
What we call the streaming engine uses batch-based efficient processing of complete queries, without having to keep the full data in memory. It doesn't provide any guarantees about latency, internal batch size, or anything of the sort. That would massively complicate things.
FWIW the AI slop answers is
The AI slop is slop. I regret spending the minute of my life reading it.
What you're thinking of is more along the lines of Apache Flink. It's not currently in scope.