faststream icon indicating copy to clipboard operation
faststream copied to clipboard

Bug: KafkaMessage annotation is not designed for broker.subscriber(batch=True)

Open dolfinus opened this issue 10 months ago • 0 comments

Describe the bug How KafkaMessage annotation should be used with batch=True?

  • using batch=True with messages: list[KafkaMessage] fails.
  • using batch=True with message: KafkaMessage got body=[b"...", ...], but expected body: bytes.

How to reproduce, which can be accessed like this:

from faststream import FastStream
from faststream.kafka import KafkaBroker, KafkaMessage

broker = KafkaBroker()
app = FastStream(broker=broker)

@broker.subscriber("topic")
def my_subscriber_single(message: KafkaMessage):
  assert isinstance(message.body, bytes), repr(message.body)

@broker.subscriber("topic", batch=True)
def my_subscriber_batch_list(messages: list[KafkaMessage]):
  assert isinstance(messages, list), repr(messages)

@broker.subscriber("topic", batch=True)
def my_subscriber_batch_one(message: KafkaMessage):
  assert isinstance(message.body, bytes), repr(message.body)

Expected behavior

my_subscriber_batch_list receives list of KafkaMessage. my_subscriber_batch_one cannot be created or used as batch=True expects list[...], and type annotation does not match it.

Observed behavior

my_subscriber_batch_list fails with:

pydantic.errors.PydanticUserError: `my_subscriber_batch_list` is not fully defined; you should define `AnyDict`, then call `my_subscriber_batch_list.model_rebuild()`

my_subscriber_batch_one fails with:

AssertionError: [b"...", b"..."]

Screenshots

Environment

Running FastStream 0.5.35 with CPython 3.12.2 on Linux

Additional context

dolfinus avatar Mar 10 '25 14:03 dolfinus