faststream
faststream copied to clipboard
Bug: KafkaMessage annotation is not designed for broker.subscriber(batch=True)
Describe the bug
How KafkaMessage annotation should be used with batch=True?
- using
batch=Truewithmessages: list[KafkaMessage]fails. - using
batch=Truewithmessage: KafkaMessagegotbody=[b"...", ...], but expectedbody: bytes.
How to reproduce, which can be accessed like this:
from faststream import FastStream
from faststream.kafka import KafkaBroker, KafkaMessage
broker = KafkaBroker()
app = FastStream(broker=broker)
@broker.subscriber("topic")
def my_subscriber_single(message: KafkaMessage):
assert isinstance(message.body, bytes), repr(message.body)
@broker.subscriber("topic", batch=True)
def my_subscriber_batch_list(messages: list[KafkaMessage]):
assert isinstance(messages, list), repr(messages)
@broker.subscriber("topic", batch=True)
def my_subscriber_batch_one(message: KafkaMessage):
assert isinstance(message.body, bytes), repr(message.body)
Expected behavior
my_subscriber_batch_list receives list of KafkaMessage.
my_subscriber_batch_one cannot be created or used as batch=True expects list[...], and type annotation does not match it.
Observed behavior
my_subscriber_batch_list fails with:
pydantic.errors.PydanticUserError: `my_subscriber_batch_list` is not fully defined; you should define `AnyDict`, then call `my_subscriber_batch_list.model_rebuild()`
my_subscriber_batch_one fails with:
AssertionError: [b"...", b"..."]
Screenshots
Environment
Running FastStream 0.5.35 with CPython 3.12.2 on Linux