aiokafka
aiokafka copied to clipboard
I keep getting MessageSizeTooLargeError, error message gives size much bigger than actual message were given to producer.
I'm using avro serialization and nevertheless I'm receiving errors like "The message is 1699136 bytes when serialized which is larger than the maximum request size you have configured with the max_request_size configuration" though original message size was around 800kb and after serialization it also must be less than 1mb (max_request_size by default). Does producer tries to combine several messages in a batch and exceeds max_request_size?
How do you serialize with avro ? Avro is a format where a schema is required for the producer to serialize and the consumer to deserialize, otherwise is just gibberish bytes. Usually the strategy with kafka is to store schema in some sort of central registery, then put in kafka header the reference to the schema used to produce the message. It is what confluent is doing with their schema registry.
As far as I know aiokafka
doesn't provide anything avro related, so the serializer/deserializer must be from your own implementation. Depending on what you are doing, it might be that your serialized message is containing both the schema and the data
@vmaurin I'm using kafkit library for serialization and communication with schema registry.
Maybe try to dump the message you serialized before passing it to aiokafka
? Otherwise, as far as I can see, the size is checked per message https://github.com/aio-libs/aiokafka/blob/master/aiokafka/producer/producer.py#L411 (even if then messages might be batched)
The formula seems to be : overhead + len(key) + len(value)
Headers seems ignored
@vmaurin Can I specify max_request_size for producer bigger than broker's relevant value if I have compression enabled?
You mean max.message.bytes on broker/topic ? It might be then it seems to be applied after compression, but then it is also applied to a batch of message, while the check in aiokafka
is just for a single message
@vmaurin yes, but I use send_and_wait() to send immediately, so I hope batch will not exceed max.message.bytes
too.
@ant0nk @vmaurin did you figure this out? Having similar issue:
- enabled
zstd
comression - sending 1.5mb message
- aiokafka responds with
The message is ... bytes when serialized which is larger than the maximum request size ... 1048576
- aiokafka seems to be checking message size before compression as manually compressed message is approx ~700kb in size
Got around the problem by disabling aiokafka
message size validation.
aiokafka
validates message size before compression - rejecting otherwise valid messages. Setting max_request_size
to huge value disables aiokafka
validation. Validation is still performed by kafka. So, if compressed message is too big kafka.errors.MessageSizeTooLargeError
is raised.
@Symas1 Your approach may not work if you send messages quickly enough, as aiokafka combines multiple messages into batches and raising this setting may lead to huge requests being rejected by broker.
[…] This setting will limit the number of record batches the producer will send in a single request to avoid sending huge requests.