sarama
sarama copied to clipboard
Sarama API about consumer groups are not idiot-proof enough.
It's particularly tricky to have a consumer groups working correctly when a rebalancing happens. The sample code was wrong, it has been corrected with https://github.com/Shopify/sarama/pull/2240/commits/bddf37e2c3f7241c1398478c73205094507dac90 It is still wrong
https://github.com/Shopify/sarama/blob/d0a00ae5507996d3b7c06aa9e8d1fa58f8013c6b/examples/consumergroup/main.go#L184
The remaining mistake is that the close of the channel might be received before the cancellation of the context, so the correct code should be like this:
for {
select {
case message, ok := <-claim.Messages():
if !ok {
return nil
}
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
…
By the way, the example code there https://pkg.go.dev/github.com/Shopify/sarama?utm_source=godoc#example-ConsumerGroup maybe deserves to be fixed.
I was bitten by the first version of the example (like many others I presume). I had a slow consumer, so claim.Messages()
was a go buffered channel saturated with 256 elements at the time the rebalancing happens. And of course, naively iterating until the end of the channel triggers various io timeouts.
The big issue is that the naïve code works well until a rebalancing occurs with slow consumers. Something that typically happens in prod and not during the tests. I would say that the APIs don't respect the principle of least astonishment (POLA) and you should do something about it. Here are some suggestions:
add a helper function/method
var message *sarama.ConsumerMessage
for claim.NextMessage(&message) {
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "")
}
I agree that the channel based API has some elegance and fit perfectly with the business to do. The naive code is pretty (but wrong). But given how complex it is to have it right, let's drop the channel based API and use some iterator based API.
make the naive code correct
That can be done very easily with an helper like this (totally untested):
// forward anything from in to out until the end of in or the context cancelled.
func forward[A any](ctx context.Context, in <-chan A, out chan<- A) {
defer close(out)
for {
var pending A
select {
case a, ok := <-in:
if !ok {
return
}
pending = a
case <-ctx.Done():
return
}
select {
case out <- pending:
case <-ctx.Done():
return
}
}
}
// Unbuffered returns a new unbuffered read channel with the same content of `in` that is closed immediately when the context is cancelled.
func Unbuffered[A any](ctx context.Context, in <-chan A) <-chan A {
ret := make(chan A, 0) // unbuffered
go forward(ctx, in, ret)
return ret
}
claim.Messages()
shall return an unbuffered channel and all the naive consumer code on earth suddenly becomes correct.
The runtime cost is one additional go routine, no additional allocation. I doubt this has any performance impact.
I don't see how it can break existing clients (existing clients that assumes the channel is buffered and play with len
/cap
on it, look very dirty. Maybe they deserve to be broken)
Warn in the docs
At the very least, you should warn people better than what is done. I had read the doc here https://github.com/Shopify/sarama/blob/d0a00ae5507996d3b7c06aa9e8d1fa58f8013c6b/consumer_group.go#L46
and also the doc here https://github.com/Shopify/sarama/blob/d0a00ae5507996d3b7c06aa9e8d1fa58f8013c6b/consumer_group.go#L987
but I wrongly interpreted them.
You should add in both places a warning that the channel is buffered and the user must watch for the cancellation of the context to exit early.
Thanks! Probably just saved me an hour or two.
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.