Bug: NATS PullSub batch=True does not work reliably
Describe the bug
When passing for example PullSub(batch_size=10, timeout=60, batch=True) to the subscriber I am expecting that messages are only passed when either
- 10 messages got consumed
- or 60 seconds of time have passed
However it seems that individual messages get passed to the handler.
How to reproduce
Consumer:
from faststream import FastStream, Logger
from faststream.nats import NatsBroker, PullSub
broker = NatsBroker()
app = FastStream(broker)
@broker.subscriber(
subject="test",
stream="test",
durable="test",
pull_sub=PullSub(batch_size=10, timeout=60, batch=True),
)
async def handle(msg, logger: Logger):
logger.info(msg)
Producer:
import asyncio
import random
import time
from nats.aio.client import Client as NATS
async def main():
nc = NATS()
await nc.connect("nats://localhost:4222")
subject = "test"
message = b"Hello"
try:
while True:
await nc.publish(subject, message)
await asyncio.sleep(random.uniform(0.1, 0.5))
except asyncio.CancelledError:
pass
finally:
await nc.drain()
if __name__ == "__main__":
try:
asyncio.run(main())
except KeyboardInterrupt:
pass
Expected behavior I expect that only a batch of messages gets passed to handler following the rules:
- 10 messages get flushed
- Or messages after 60 seconds get flushed
Observed behavior Messages get passed individually and not as a batch
faststream run example:app
2025-05-20 09:24:22,287 INFO - FastStream app starting...
2025-05-20 09:24:22,296 INFO - test | - `Handle` waiting for messages
2025-05-20 09:24:22,297 INFO - FastStream app started successfully! To exit, press CTRL+C
2025-05-20 09:24:29,380 INFO - test | cda1192d-a - Received
2025-05-20 09:24:29,381 INFO - test | cda1192d-a - [b'Hello']
2025-05-20 09:24:29,381 INFO - test | cda1192d-a - Processed
2025-05-20 09:24:29,522 INFO - test | 373dbadf-4 - Received
2025-05-20 09:24:29,523 INFO - test | 373dbadf-4 - [b'Hello']
2025-05-20 09:24:29,523 INFO - test | 373dbadf-4 - Processed
2025-05-20 09:24:29,795 INFO - test | b8822bda-8 - Received
2025-05-20 09:24:29,796 INFO - test | b8822bda-8 - [b'Hello']
2025-05-20 09:24:29,796 INFO - test | b8822bda-8 - Processed
2025-05-20 09:24:30,015 INFO - test | c537afd8-7 - Received
2025-05-20 09:24:30,015 INFO - test | c537afd8-7 - [b'Hello']
2025-05-20 09:24:30,016 INFO - test | c537afd8-7 - Processed
2025-05-20 09:24:30,280 INFO - test | 38cadeec-3 - Received
Environment
Running FastStream 0.5.40 with CPython 3.13.2 on Darwin
NATS delivers messages individually rather than in batches because the server doesn't accumulate enough messages before the consumer requests them.
NATS JetStream PullSub does not guarantee that the batch size will always be equal to N. It guarantees that no more than timeout seconds will pass and the batch size will not exceed N.
Publish messages in batches so they can accumulate on the server:
# Producer
tasks = []
for i in range(1, 101):
tasks.append(nc.publish(subject, message))
await asyncio.gather(*tasks)
Regardless of how NATS handles this internally, the current behavior is inconsistent with what the documentation states at https://faststream.ag2.ai/0.6/nats/jetstream/pull/#faststream-details and so I do believe this should be considered a valid issue:
However, if your subject has fewer than
10messages, your request to NATS will be blocked fortimeout(5 seconds by default) while trying to collect the required number of messages.
Rather than actually blocking for up to timeout seconds to deliver either:
batch_sizemessages as soon as that many are available; or- however many messages are available (amounting to fewer messages than the configured batch size) once
timeouthas been reached
I instead see FastStream immediately deliver messages as soon as they arrive in NATS, as can be seen in this log (as well as the original issue report), where a consumer is created with deliver_policy=DeliverPolicy.ALL so that it receives historical events (processed with batch_size=32), followed by it receiving events as they come in in real-time (processed individually and immediately instead of being batched up):
INFO: Started server process [248699]
INFO: Waiting for application startup.
2025-11-20 12:56:38,511 INFO Starting broker
2025-11-20 12:56:38,513 INFO - events.> | - `ExampleHandler` waiting for messages
2025-11-20 12:56:38,519 INFO - events.> | f6b65d91-3 - Received
2025-11-20 12:56:38,523 INFO Received batch of length: 32
2025-11-20 12:56:38,524 INFO - events.> | f6b65d91-3 - Processed
2025-11-20 12:56:38,525 INFO - events.> | 40ee9757-1 - Received
2025-11-20 12:56:38,526 INFO Received batch of length: 32
2025-11-20 12:56:38,527 INFO - events.> | 40ee9757-1 - Processed
2025-11-20 12:56:38,528 INFO - events.> | 02551fcb-a - Received
2025-11-20 12:56:38,529 INFO Received batch of length: 32
2025-11-20 12:56:38,529 INFO - events.> | 02551fcb-a - Processed
--- >8 many similar log lines snipped ----
2025-11-20 12:56:40,194 INFO - events.> | fe29b9ed-9 - Processed
2025-11-20 12:56:40,195 INFO - events.> | 24960a43-c - Received
2025-11-20 12:56:40,196 INFO Received batch of length: 32
2025-11-20 12:56:40,196 INFO - events.> | 24960a43-c - Processed
2025-11-20 12:56:40,197 INFO - events.> | e3e66ce9-5 - Received
2025-11-20 12:56:40,198 INFO Received batch of length: 32
2025-11-20 12:56:40,198 INFO - events.> | e3e66ce9-5 - Processed
2025-11-20 12:56:40,199 INFO - events.> | dbc73b90-2 - Received
2025-11-20 12:56:40,200 INFO Received batch of length: 32
2025-11-20 12:56:40,200 INFO - events.> | dbc73b90-2 - Processed
2025-11-20 12:56:40,202 INFO - events.> | 95e998a1-5 - Received
2025-11-20 12:56:40,202 INFO Received batch of length: 32
2025-11-20 12:56:40,203 INFO - events.> | 95e998a1-5 - Processed
2025-11-20 12:56:40,204 INFO - events.> | 5080fe57-8 - Received
2025-11-20 12:56:40,204 INFO Received batch of length: 32
2025-11-20 12:56:40,205 INFO - events.> | 5080fe57-8 - Processed
2025-11-20 12:56:40,206 INFO - events.> | 7d49ed12-7 - Received
2025-11-20 12:56:40,206 INFO Received batch of length: 31
2025-11-20 12:56:40,207 INFO - events.> | 7d49ed12-7 - Processed
2025-11-20 12:56:40,288 INFO - events.> | 15083f42-3 - Received
2025-11-20 12:56:40,288 INFO Received batch of length: 1
2025-11-20 12:56:40,288 INFO - events.> | 15083f42-3 - Processed
2025-11-20 12:56:40,392 INFO - events.> | 187cdbea-8 - Received
2025-11-20 12:56:40,393 INFO Received batch of length: 1
2025-11-20 12:56:40,393 INFO - events.> | 187cdbea-8 - Processed
2025-11-20 12:56:40,497 INFO - events.> | 5ce90c16-6 - Received
2025-11-20 12:56:40,498 INFO Received batch of length: 1
2025-11-20 12:56:40,498 INFO - events.> | 5ce90c16-6 - Processed
2025-11-20 12:56:40,601 INFO - events.> | 13128569-8 - Received
2025-11-20 12:56:40,601 INFO Received batch of length: 1
2025-11-20 12:56:40,601 INFO - events.> | 13128569-8 - Processed
2025-11-20 12:56:40,716 INFO - events.> | cb847981-8 - Received
2025-11-20 12:56:40,717 INFO Received batch of length: 1
2025-11-20 12:56:40,718 INFO - events.> | cb847981-8 - Processed
2025-11-20 12:56:40,819 INFO - events.> | 7f225c7b-1 - Received
2025-11-20 12:56:40,820 INFO Received batch of length: 1
2025-11-20 12:56:40,820 INFO - events.> | 7f225c7b-1 - Processed
2025-11-20 12:56:40,934 INFO - events.> | 788aee3b-7 - Received
2025-11-20 12:56:40,936 INFO Received batch of length: 1
2025-11-20 12:56:40,937 INFO - events.> | 788aee3b-7 - Processed
2025-11-20 12:56:41,038 INFO - events.> | fcf16623-6 - Received
2025-11-20 12:56:41,039 INFO Received batch of length: 1
2025-11-20 12:56:41,039 INFO - events.> | fcf16623-6 - Processed
2025-11-20 12:56:41,154 INFO - events.> | ecb329bb-1 - Received
2025-11-20 12:56:41,155 INFO Received batch of length: 1
2025-11-20 12:56:41,155 INFO - events.> | ecb329bb-1 - Processed
2025-11-20 12:56:41,269 INFO - events.> | d7cf02e0-c - Received
2025-11-20 12:56:41,270 INFO Received batch of length: 1
2025-11-20 12:56:41,271 INFO - events.> | d7cf02e0-c - Processed
2025-11-20 12:56:41,374 INFO - events.> | 0f95ae53-2 - Received
2025-11-20 12:56:41,375 INFO Received batch of length: 1
2025-11-20 12:56:41,375 INFO - events.> | 0f95ae53-2 - Processed
2025-11-20 12:56:41,488 INFO - events.> | 9ebef903-2 - Received
2025-11-20 12:56:41,490 INFO Received batch of length: 1
2025-11-20 12:56:41,490 INFO - events.> | 9ebef903-2 - Processed
2025-11-20 12:56:41,598 INFO - events.> | c8262e9e-5 - Received
2025-11-20 12:56:41,599 INFO Received batch of length: 1
2025-11-20 12:56:41,599 INFO - events.> | c8262e9e-5 - Processed
2025-11-20 12:56:41,713 INFO - events.> | 57e467cb-2 - Received
2025-11-20 12:56:41,715 INFO Received batch of length: 1
2025-11-20 12:56:41,715 INFO - events.> | 57e467cb-2 - Processed
^C2025-11-20 12:56:41,861 INFO - events.> | 12deb8eb-a - Received
2025-11-20 12:56:41,862 INFO Received batch of length: 1
2025-11-20 12:56:41,862 INFO - events.> | 12deb8eb-a - Processed
INFO: Shutting down
2025-11-20 12:56:41,935 INFO - events.> | a41027f4-9 - Received
2025-11-20 12:56:41,935 INFO Received batch of length: 1
2025-11-20 12:56:41,935 INFO - events.> | a41027f4-9 - Processed
INFO: Waiting for application shutdown.
2025-11-20 12:56:42,022 INFO - | - callback for Task-7 is being executed...
INFO: Application shutdown complete.
INFO: Finished server process [248699]
What I have found is that manually sleeping for timeout seconds when given fewer than batch_size items results in the expected/desired behavior. That is, with:
@router.subscriber(
# ...bunch of other settings here
pull_sub=PullSub(
batch_size=BATCH_SIZE,
timeout=BATCH_TIMEOUT,
batch=True,
)
)
async def example_handler(batch: list[Any], nats_msg: NatsMessage) -> None:
# Log messages for demonstration purposes:
logging.info("Received batch of length: %s", len(batch))
# Explicitly acknowledge so NATS knows it's been processed successfully prior to us entering sleep
await nats_msg.ack()
with contextlib.suppress(asyncio.CancelledError):
if len(batch) < BATCH_SIZE:
await asyncio.sleep(BATCH_TIMEOUT)
Using BATCH_SIZE=32 and BATCH_TIMEOUT=15, you will get the following behavior:
INFO: Started server process [280573]
INFO: Waiting for application startup.
2025-11-20 13:06:16,453 DEBUG Creating ClickHouse client instance with connection to localhost:8123, database: events
2025-11-20 13:06:16,470 INFO Starting broker
2025-11-20 13:06:16,472 INFO - events.> | - `ExampleHandler` waiting for messages
INFO: Application startup complete.
INFO: Uvicorn running on http://0.0.0.0:8081 (Press CTRL+C to quit)
2025-11-20 13:06:16,483 INFO - events.> | 67c67906-8 - Received
2025-11-20 13:06:16,488 INFO Received batch of length: 32
2025-11-20 13:06:16,488 INFO - events.> | 67c67906-8 - Processed
2025-11-20 13:06:16,490 INFO - events.> | a760d12c-8 - Received
2025-11-20 13:06:16,491 INFO Received batch of length: 32
2025-11-20 13:06:16,491 INFO - events.> | a760d12c-8 - Processed
2025-11-20 13:06:16,493 INFO - events.> | 59145042-b - Received
2025-11-20 13:06:16,493 INFO Received batch of length: 32
2025-11-20 13:06:16,494 INFO - events.> | 59145042-b - Processed
2025-11-20 13:06:16,495 INFO - events.> | 0e3bae4d-e - Received
2025-11-20 13:06:16,496 INFO Received batch of length: 32
2025-11-20 13:06:16,496 INFO - events.> | 0e3bae4d-e - Processed
2025-11-20 13:06:16,497 INFO - events.> | f0a10a8e-4 - Received
2025-11-20 13:06:16,498 INFO Received batch of length: 32
2025-11-20 13:06:16,499 INFO - events.> | f0a10a8e-4 - Processed
2025-11-20 13:06:16,500 INFO - events.> | c8e5b623-6 - Received
2025-11-20 13:06:16,501 INFO Received batch of length: 32
2025-11-20 13:06:16,501 INFO - events.> | c8e5b623-6 - Processed
--- >8 many similar log lines snipped ----
2025-11-20 13:06:18,582 INFO - events.> | 4c5761b9-7 - Received
2025-11-20 13:06:18,583 INFO Received batch of length: 32
2025-11-20 13:06:18,583 INFO - events.> | 4c5761b9-7 - Processed
2025-11-20 13:06:18,585 INFO - events.> | 4c59ebbe-c - Received
2025-11-20 13:06:18,585 INFO Received batch of length: 32
2025-11-20 13:06:18,586 INFO - events.> | 4c59ebbe-c - Processed
2025-11-20 13:06:18,587 INFO - events.> | 29426fbc-5 - Received
2025-11-20 13:06:18,588 INFO Received batch of length: 32
2025-11-20 13:06:18,588 INFO - events.> | 29426fbc-5 - Processed
2025-11-20 13:06:18,589 INFO - events.> | 26e4c591-c - Received
2025-11-20 13:06:18,590 INFO Received batch of length: 23
// Note the timestamps, 15 seconds pass between these two entries //
2025-11-20 13:06:33,590 INFO - events.> | 26e4c591-c - Processed
2025-11-20 13:06:33,597 INFO - events.> | b6cd0172-3 - Received
2025-11-20 13:06:33,600 INFO Received batch of length: 32
2025-11-20 13:06:33,600 INFO - events.> | b6cd0172-3 - Processed
2025-11-20 13:06:33,607 INFO - events.> | f8056f1f-0 - Received
2025-11-20 13:06:33,610 INFO Received batch of length: 32
2025-11-20 13:06:33,610 INFO - events.> | f8056f1f-0 - Processed
2025-11-20 13:06:33,616 INFO - events.> | f650b173-b - Received
2025-11-20 13:06:33,619 INFO Received batch of length: 32
2025-11-20 13:06:33,620 INFO - events.> | f650b173-b - Processed
2025-11-20 13:06:33,625 INFO - events.> | 37aa96be-6 - Received
2025-11-20 13:06:33,627 INFO Received batch of length: 32
2025-11-20 13:06:33,628 INFO - events.> | 37aa96be-6 - Processed
2025-11-20 13:06:33,630 INFO - events.> | df825216-e - Received
2025-11-20 13:06:33,631 INFO Received batch of length: 9
// Note the timestamps, 15 seconds pass between these two entries //
2025-11-20 13:06:48,631 INFO - events.> | df825216-e - Processed
2025-11-20 13:06:48,634 INFO - events.> | d044408d-d - Received
2025-11-20 13:06:48,635 INFO Received batch of length: 32
2025-11-20 13:06:48,635 INFO - events.> | d044408d-d - Processed
2025-11-20 13:06:48,638 INFO - events.> | 321e71c8-8 - Received
2025-11-20 13:06:48,638 INFO Received batch of length: 32
2025-11-20 13:06:48,639 INFO - events.> | 321e71c8-8 - Processed
2025-11-20 13:06:48,641 INFO - events.> | ddc42e11-6 - Received
2025-11-20 13:06:48,642 INFO Received batch of length: 32
2025-11-20 13:06:48,642 INFO - events.> | ddc42e11-6 - Processed
2025-11-20 13:06:48,643 INFO - events.> | 96cf4e5e-1 - Received
2025-11-20 13:06:48,644 INFO Received batch of length: 32
2025-11-20 13:06:48,644 INFO - events.> | 96cf4e5e-1 - Processed
2025-11-20 13:06:48,645 INFO - events.> | a3ecc9cf-9 - Received
2025-11-20 13:06:48,646 INFO Received batch of length: 8
// Note the timestamps, 15 seconds pass between these two entries //
2025-11-20 13:07:03,646 INFO - events.> | a3ecc9cf-9 - Processed
2025-11-20 13:07:03,652 INFO - events.> | 7beb29b8-c - Received
2025-11-20 13:07:03,655 INFO Received batch of length: 32
2025-11-20 13:07:03,656 INFO - events.> | 7beb29b8-c - Processed
2025-11-20 13:07:03,663 INFO - events.> | f89f5f4a-4 - Received
2025-11-20 13:07:03,665 INFO Received batch of length: 32
2025-11-20 13:07:03,666 INFO - events.> | f89f5f4a-4 - Processed
2025-11-20 13:07:03,672 INFO - events.> | 7d1a8276-c - Received
2025-11-20 13:07:03,675 INFO Received batch of length: 32
2025-11-20 13:07:03,676 INFO - events.> | 7d1a8276-c - Processed
2025-11-20 13:07:03,682 INFO - events.> | 1b9c4e3e-9 - Received
2025-11-20 13:07:03,684 INFO Received batch of length: 32
2025-11-20 13:07:03,685 INFO - events.> | 1b9c4e3e-9 - Processed
2025-11-20 13:07:03,687 INFO - events.> | 08a8d13e-3 - Received
2025-11-20 13:07:03,688 INFO Received batch of length: 8
^CINFO: Shutting down
INFO: Waiting for application shutdown.
2025-11-20 13:07:05,898 INFO - events.> | 08a8d13e-3 - Processed
2025-11-20 13:07:05,899 INFO - | - callback for Task-7 is being executed...
INFO: Application shutdown complete.
INFO: Finished server process [280573]
...which matches the semantics described for pull_sub as written at https://faststream.ag2.ai/0.6/nats/jetstream/pull/#faststream-details