sarama
sarama copied to clipboard
Possible Message Batching Bug?
Description
I'm noticing Kafka Broker batched message submissions are getting rejected due to the message being too large. Initially I thought this was due to the post compressed message size being too large however, logs added to the Kafka.Errors feedback channel (as shown below) displayed messages as small as 900B being rejected.
Logs
2024/02/11 05:32:57 Message Too Large(497922): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(2394856): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(919): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(188553): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(3805): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(1742883): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(6544): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(1216): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(1164809): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(6546): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(903): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(4911): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 05:32:57 Message Too Large(4911): kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
Logging Code
case kafkaErr := <-asyncProducer.Errors():
switch unwrapedErr := kafkaErr.Err.(type) {
case sarama.KError:
switch unwrapedErr {
case sarama.ErrMessageSizeTooLarge:
log.Printf("Message Too Large(%d) %s\n", kafkaErr.Msg.Value.Length(), kafkaErr.Error())
metrics.KafkaErrorOtel.Add(ctx, 1, attribute.String("error", "message_size_too_large"))
Looking closer, these errors were bursty and occurs all at once. I know that one kafka client will take batching errors and resubmit them after breaking them down but looking at the code in this repo, this is not the case. I was looking through the code + docs and saw that most of the configs are stated as 'best-effort' with the docs stating 'By default, messages are sent as fast as possible, and all messages received while the current batch is in-flight are placed into the subsequent batch.' Is it possible here that the buffers are filling too quickly past the limit when an existing publish is in flight, and then is over the size?
Versions
Note: I'm unable to reproduce this on my laptop. I can only reproduce this in a production environemnt where the throughput specs are 350GB/min, 13M messages consumed/min, 13M messages published/min. Adding more hosts to the cluster does alleviate the issue even though CPU/Memory are very low
Sarama | Kafka | Go |
---|---|---|
v1.38.1 | TODO | 1.21.3 |
Configuration
// This is done due to the bug checking pre/post compressed incoming message sizes.
saramaConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)
saramaConfig.Producer.Compression = sarama.CompressionGZIP
saramaConfig.Producer.Return.Successes = true
Logs
Additional Context
@road-cycling thanks for reporting this, would it be possible for you to upgrade to a newer version of Sarama? At least v1.41.1, but ideally picking up the latest v1.42.2. In particular https://github.com/IBM/sarama/pull/2628 disambiguated between messages failing a local client-side Producer.MaxMessageBytes check, versus those being genuinely rejected by the remote cluster so that would help to narrow down where you're seeing this occur
Using IBM:sarama:v1.42.2
Logs
2024/02/11 22:34:04 Message Too Large(909) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1212828) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(804014) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1733361) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1739543) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1000719) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(465610) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1750667) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1206) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(1171) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:34:04 Message Too Large(6544) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(1790812) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(6544) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(6745518) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(6546) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
2024/02/11 22:35:56 Message Too Large(721256) kafka: Failed to produce message to topic metrics-debug: kafka server: Message was too large, server rejected it to avoid allocation error
Same Switch Case
case sarama.ErrMessageSizeTooLarge:
log.Printf("Message Too Large(%d) %s\n", kafkaErr.Msg.Value.Length(), kafkaErr.Error())
metrics.KafkaErrorOtel.Add(ctx, 1, attribute.String("error", "message_size_too_large"))
Note:
Adding the config saramaConfig.Producer.Flush.MaxMessages = 1
fixes this issue and leads to 0 "Message Too Large" errors but the saram.log is flooded with (below) which is in no way efficient.
[sarama] 2024/02/11 22:58:16.075040 producer/broker/97623 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.115777 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.124684 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.131789 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.133214 producer/broker/97623 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.136310 producer/broker/116321 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.136723 producer/broker/123253 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.136814 producer/broker/122428 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.141094 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.141183 producer/broker/123160 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.149647 producer/broker/112508 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.161138 producer/broker/100837 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.161767 producer/broker/112120 maximum request accumulated, waiting for space
[sarama] 2024/02/11 22:58:16.165873 producer/broker/123448 maximum request accumulated, waiting for space
I was looking at this today. We were also playing with producer vs broker compression and testing sending batches of data, some all zeroes, some random (which won't compress).
We quickly found that we (like @road-cycling above) had to set:
saramaConfig.Producer.MaxMessageBytes = int(sarama.MaxRequestSize)
(presumably following the same advice we found in https://github.com/IBM/sarama/issues/2142#issuecomment-1124331302)
And while that happily fixed our ability to send large compressible messages (e.g. 2MB of all zeroes), it then broke our ability to send other messages, such as 400KB of random data, or 400KB of zeroes to a topic with no producer compressions.
e.g. what we'd see if we tried to send 10 x 400KB messages in quick succession, we would see the first message send in a batch on its own and succeed, however the following 9 messages would batch together (making a total size of approx 3.2MB) which would then result in the error: Message was too large, server rejected it to avoid allocation error
Output from our test app:
Compression: none Size: 1000000 Source: random Quantity: 10
0: success
1: kafka server: Message was too large, server rejected it to avoid allocation error
...
9: kafka server: Message was too large, server rejected it to avoid allocation error
And this is because an unintended side-effect of max-ing out MaxMessageBytes
is that it is used in 2 places - the most important actually being here:
https://github.com/IBM/sarama/blob/0ab2bb77aeca321f41a0953a8c6f52472607a59e/produce_set.go#L253-L255
which as I understand it controls the batching logic. By setting that to MaxRequestSize
the overall batch doesn't know that it's too big, and the server rejects the message.
If the messages are sent by themselves (low-traffic) then it works, but when batched up, fails miserably.
So I do think we need to keep MaxMessageBytes
set to a sane default, but instead we should remove the following check, or have a flag to disable it:
https://github.com/IBM/sarama/blob/0ab2bb77aeca321f41a0953a8c6f52472607a59e/async_producer.go#L453-L457
That would also help clear up #2851 and all the associated issues there, as we'd likely opt to ignore that check.
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.