sarama
sarama copied to clipboard
AsyncProducer produces messages in out-of-order when retries happen
Versions
Sarama | Kafka | Go |
---|---|---|
>= v1.31.0 | v3.5.0 | 1.20.7 |
Configuration
What configuration values are you using for Sarama and Kafka?
config.Version = sarama.V2_7_0_0
// Read success and errors
config.Producer.Return.Successes = true
config.Producer.Return.Errors = true
// Wait for Ack from all
config.Producer.RequiredAcks = sarama.WaitForAll
// Reduce internal buffer size to simplify the reproduction
config.ChannelBufferSize = 10
// Allow only one open requests at a time
config.Net.MaxOpenRequests = 1
// Retry almost indefinitely
config.Producer.Retry.Max = 1_000_000_000
// Reduce the max size, to flush more often and reproduce easily
config.Producer.MaxMessageBytes = 100
// We are only gonna produce to one partition to simplify the reproduction
config.Producer.Partitioner = sarama.NewManualPartitioner
Logs & Reproduction Code
I have created a Gist with all the information required to reproduce (code, configuration, etc), including some example logs: https://gist.github.com/dethi/406fb5562030d8d0fa76db18d95bbbbe
Problem Description
Expectation
When producing message to the same partition, the AsyncProducer guarantee the ordering of the produced messages even when retries are required, as long as config.Net.MaxOpenRequests = 1
.
Idempotency shouldn't matter here.
Reality
Up until 1.31.0, the expectation hols. But the behaviour changed when request pipelining was introduced to the AsyncProducer. Now, retries cause message to be published out of order.
An easy way to see this is by enabling config.Producer.Idempotent
. It will result in the AsyncProducer returning an error to the caller when retries happen (like the partition leader disappear, i.e. broker disconnect):
kafka: Failed to produce message to topic sarama-outoforder-bug: kafka server: The broker received an out of order sequence number
When idempotency is not enabled, Sarama will publish successfully the messages, but in out-of-order.
Code Analysis / Flow of produced messages
(follow the links as you read each paragraph)
-
Code link: We think the issue is coming here. Here it use to be that we would not send another message/batch to the Kafka broker before we got back the answer from that broker and we sent the answer to the goroutine that processes that answer. One of the key point here as well is that the goroutine that writes into the
bridge
channel is also the goroutine that reads from the responses channel as we can see infunc (bp brokerProducer) waitForSpace
orfunc (bp brokerProducer) run
, which means that wouldn't send another message to Kafka before we received AND processed the answer for the previous message. -
Code link: Now we use the new AsyncProduce function to send messages to Kafka brokers. The key point here is that it use to be that we would not be able to call AsyncProduce (or Produce to be exact) before the previous call to AsyncProduce/Produce returned (which would also give us the response of the request). Now the response are processed asynchronously and sent back to us via the sendResponse callback. We will see in part 3 that once a message is sent to the broker and the goroutine that processes the response is scheduled then AsyncProduce will return and another call will be made even though we potentially did not received the answer from the previous request yet.
-
Code link:
broker.AsyncProduce()
usessendInternal
to send a batch/message to a broker.b.responses
is a buffered channel that is used to control how many "in flight" requests there is currently to that broker so that a goroutine can't call AsyncProduce before we were able to schedule a run of the goroutine that will processes the response for that request (seefunc (b *Broker) responseReceiver()
). One of the issue here is that if we setb.conf.Net.MaxOpenRequests = 1
so that we force theb.response
to have a buffer of size 0 then it seems to me that we can still haveb.conf.Net.MaxOpenRequests + 1
in flight requests to the Kafka broker. Assuming MaxOpenRequests is equal to one, then sure the 2nd call to AsyncProduce will block onb.responses <- promise
but there will still be 2 inflight requests. -
Code link: Once a response was received by the broker from Kafka, it gets forwarded to the
pending
channel, which we read from the same goroutine. The responses are then forwarded back to thefunc (bp *brokerProducer) run()
goroutine, which is also the one that is sending messages to thebridge
channel. -
Code link: Once a response is received in the
func (bp brokerProducer) run()
goroutine, we will end up calling thisretryMessages
function if for instance the broker that we tried to send a the batch/message to crashed or was not available for other reasons. In that case we will retry to send the message that trigger that error by callingretryMessages
on line 1146 forpSet.msgs
which will send back all the element of thepSet.msgs
batch into the input channel of the asyncProducer manager. We then also send back all messages that were buffered in the brokerProducer (these messages should have been batched and sent to Kafka at some point via AsyncProduce) so that we can send them after we tried to send back the elements ofpSet.msgs
because they were produced after the messages inpSet.msgs
. The problem here is that this retry sequence does not take into account the messages that are currently in flight and for which we did not receive a response yet. Because of that, the ordering of the messages will change in case we have to retry to send the messages that were in flight when we had to reschedule a retry when we received the first error.
Surprisingly, part of the issue the issue was already noted in the latest comment of the same PR:
This means setting Net.MaxOpenRequests to 1 and using the AsyncProducer might result in up to 2 in-flight requests, leading to possible ordering issues or maybe an error with idempotency turn on against Kafka 0.11.0.x.
Thanks to @T30rix for helping me with the debugging and writing the detailed flow.
@dethi As far as I know, if you set max.in.flight.requests.per.connection
to 1 and don't enable the idempotent producer (enable.idempotence
=false), the Kafka producer will only have one unacknowledged request in flight at a time, which effectively reduces the likelihood of message reordering during produce retries.
In this scenario, the producer will wait for an acknowledgment (ACK) from the broker before sending the next message. This means that if a message fails to be sent (due to a transient error or a network issue), the producer will retry sending that message before sending any new messages. This mechanism helps maintain a closer order of messages as they were originally sent by the producer.
However, please note that there is still no strict guarantee of message order in this case, as certain scenarios like broker failures or network partitions can introduce potential complexities. Messages might still be reordered if a network issue causes a message to be delayed and another message is sent before the delayed message is successfully delivered.
So in summary, while setting max.in.flight.requests.per.connection
to 1 and not enabling idempotence can help reduce the chances of message reordering, it's not a full-proof guarantee, and enabling idempotence is the recommended approach if strict message order is a critical requirement for your use case.
And I'm very curious why the previous versions of Sarama work well only with config.Net.MaxOpenRequests = 1
.
Please CMIIW @dnwe
@napallday the kafka documentation seems to indicate that if max.in.flight.requests.per.connection
is set to 1 then there is no risk of message reordering. If config.Net.MaxOpenRequests
is the Sarama equivalent of max.in.flight.requests.per.connection
then this behavior is broken since #2094 merged because we can now have config.Net.MaxOpenRequests
+ 1 request in flight.
And I'm very curious why the previous versions of Sarama work well only with config.Net.MaxOpenRequests = 1
This is explained in @slaunay 's post here:
I believe the brokerProducer goroutine was previously limited to a single in flight requests being synchronous (even though calling broker.Produce concurrently from multiple goroutines could already result in up to 2 in flight requests when Net.MaxOpenRequests = 1).
@T30rix I see. Seems the apache/kafka and Confluent Kafka have different descriptions here.
In apache/kafka:
Allowing retries while setting enable.idempotence to false and max.in.flight.requests.per.connection to greater than 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
In Confluent Kafka:
Allowing retries while setting enable.idempotence to false and max.in.flight.requests.per.connection to 1 will potentially change the ordering of records because if two batches are sent to a single partition, and the first fails and is retried but the second succeeds, then the records in the second batch may appear first.
So what's the conclusion here? Is this issue considered a bug or not?
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
Not stale
Thanks for finally marking this as a bug 👍
Is there anyone working on this bug?
Any movement on this? This is impacting us significantly.
It has been investigated on-and-off, using @dethi 's sample as a functional producer test, but I don't yet have good news of a resolution to the issue
Ok, thanks for the update @dnwe
Should be fixed by #2943 if someone can review