Rebalancing issues with slow consumers
Versions
Please specify real version numbers or git SHAs, not just "Latest" since that changes fairly regularly.
| Sarama | Kafka | Go |
|---|---|---|
| a20d26723487482b4f1af8b8537657a5ba72ae37 (1.25.0) | * | 1.12 |
Problem Description
We've identified three issues that affect rebalancing when the consumer takes tens of seconds or minutes to process an event.
1: Consumers must deplete Messages() before they can be shut down
Sarama asks for the following:
// Please note, that once a rebalance is triggered, sessions must be completed within
// Config.Consumer.Group.Rebalance.Timeout. This means that ConsumeClaim() functions must exit
// as quickly as possible to allow time for Cleanup() and the final offset commit. If the timeout
// is exceeded, the consumer will be removed from the group by Kafka, which will cause offset
// commit failures.
This is a reasonable request since the Sarama waits for all ConsumeClaim()s to return before re-establishing a session and joining the new generation of the consumer group. However, it communicates this by close()ing the buffered Messages channel - not something the consumer can act upon immediately, and even worse so without first reading all the messages in the channel.
If it takes 3 minutes to handle a single message (which is a viable scenario) the consumers have to set their ChannelBufferSize to 1 and their Rebalance.Timeout to 6 minutes + some slack (since the consumer will at worst have to handle the message it is handling and the one in the channel.
Proposed solution
Add:
// StopConsuming forces an immediate stop of consumption, in addition to closing the Messages() channel
StopConsuming() <-chan struct{}
To ConsumerGroupClaim. A consumer can then run their message handling in a background go routine and wait for completion of handling or StopConsuming. They can then decide whether to terminate processing immediately, wait a grace period or complete the current processing of the event (without having to deplete the Messages channel to understand that it should exit).
2: The first few fetches from Kafka may only fetch data from one or two partitions, starving the rest for a very long time (depending on message size / processing time)
Once a member joins the consumer groups and receives its partitions, they are fed into the "subscription manager" from different go routines. The subscription manager then performs batching and executes a fetch for all the partitions. However, it seems like the batching logic in subscriptionManager is faulty, perhaps assuming that case: order prioritizes which case should be handled when all are signaled which is not the case, according to the go docs (https://golang.org/ref/spec#Select_statements):
If one or more of the communications can proceed, a single one that can proceed is chosen via a uniform pseudo-random selection. Otherwise, if there is a default case, that case is chosen. If there is no default case, the "select" statement blocks until at least one of the communications can proceed.
For example - if you receive 64 partitions, each will be handled in their own go routine which sends the partition information to the bc.input channel. After an iteration there is a race between case event, ok := <-bc.input which will batch the request and case bc.newSubscriptions <- buffer which will trigger an immediate fetch of the 1 or 2 partitions that made it into the batch.
This issue only really affects slow consumers with short messages. If the condition happens with 1 partition being in the batch (even though 63 extra partitions have been claimed but didn't make it into the batch) the fetch will ask for 1MB (by default) of messages from that single partition. If the messages are only a few bytes long and processing time is minutes, you will not perform another fetch for hours.
Proposed solution
Replace the subscriptionManager logic with a delay based one here:
https://github.com/nuclio/nuclio/blob/1.1.x/vendor/github.com/Shopify/sarama/consumer.go#L741
3: Heartbeat timer is stopped once rebalance is detected, causing session to time out if tear down is long
The heartbeat loop exits immediately when a rebalance error is returned: https://github.com/Shopify/sarama/blob/master/consumer_group.go#L760
If the consumers take a long time to tear down, the member will be ejected from the group due to session timeout.
Proposed solution
Stop sending heartbeats only AFTER the session has completed tearing down (I think this was the intent).
Summary
We can contribute fixes to these issues above if the solutions described here are agreed upon (we're currently running them through tests).
Regarding 1, we've encountered a similar problem in a fairly common use case: A Consumer doing some processing, then writing the results to a remote DB (AWS DynamoDB)
While most of the consumed items should be processed really fast, it is a fair assumption that some of them might fail due to throttling/network issues within the remote DB. In such a case, handling those failures using an exponential backoff retry is not possible as it might take more than Rebalance.Timeout to achieve a successful write.
I've come across this exact same problem. I'm building a consumer that will write records from a topic to a database, so the order in which messages are processed are really important. Since it depends on an external service, there is a function that retries failures with an exponential backoff.
But I was able to create scenario where, because of timeouts, the ordering guarantees were lost and the final state ended up inconsistent:
Scenario:
- The topic has two partitions [0,1], with the messages [0a, 0b, 0c] and [1a, 1b, 1c], respectively.
- Worker W1 starts consuming, gets both partitions assigned, but for network reasons, processing takes longer than
Session.Timeout - Worker W2 joins the group
- Worker W1 receives
ErrRebalanceInProgress, heartbeat stops - After waiting
Session.Timeout, the broker assumes W1 is dead and removes it from the group - W2 receives both partitions and is able to process all the messages quickly
- Database current state:
[0 -> 0c]; [1-> 1c](desired final state) - W1 recovers and writes old data to the database
- Database current state:
[0 -> 0a]; [1-> 1a](Invalid state)
W1 eventually stops, since it lost its partitions, but because the heartbeat stopped, W2 processed the same data as W1, which led to an inconsistent sate. If the heartbeat hadn't stopped, W2 wouldn't have been able to join until Rebalance.Timeout, which wouldn't cause this problem.
Of course, the first issue (Consumers must deplete Messages() before they can be shut down) is still problematic. Since there is no way of knowing when the consumer lost it's partitions, it is impossible to know when it should stop trying to insert the data in the database.
If anyone is in a dire state, we maintain a fork containing a branch with fixes to all the above here: https://github.com/iguazio/sarama/tree/slow-consumer (it's synced to 1.25.0 - fix commit is https://github.com/iguazio/sarama/commit/c85539a23b90465c6f4008bb0cbf5acb7ce0b1b5). It has been running in production for a while.
As the original issue states, we're still waiting for a dev to comment/approve before we invest the effort in a PR.
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the master branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
We've come across this exact same problem in [email protected] and your fork (iguazio/sarama@1-27-0-slow-consumer) @pavius helps to get the problem solved for now.
The <-claim.StopConsuming() read really helps to stop the processing by consumer handler in ConsumeClaim method.
Our scenario has messages that take about 1minute to be processed and committed. Without the StopConsuming chan we were getting many messages processed more than once, after the rebalance by scale the service from 1 instance to 10, for example.
The exact steps to reproduce our case were:
- We have a topic called
foowith 10 partitions; - start service with replicas=1 and consumer group = foo;
- everything is processing ok;
- we scale the service to 10 replicas for the same consumer group = foo;
- the rebalance starts at server;
- the first replica still processing some message;
- after commit and finish the process in ConsumeClaim (range of claim.Messages()) the consumer does not stop because there is one goroutine for each partition;
- the new service replicas start to process and some messages are duplicated (some times other rebalances happens and increase the number of duplicated messages). Now with the @pavius fork we are able to use the StopConsuming chan to process or stop early if rebalance occurs.
Example using claim.StopConsuming chan:
func (c *consumerHandler) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
fmt.Println("starting a consumeClaim for ", session.MemberID())
defer fmt.Println("consumeClaim finish for ", session.MemberID())
for {
select {
case msg, ok := <-claim.Messages():
if !ok {
return nil
}
wait := make(chan struct{}, 1)
m := Message{
session: session,
consumerMsg: msg,
wait: wait,
}
fmt.Println("received message", session.MemberID(), string(m.Key))
select {
case c.messages <- m: // we are processing outside, using a non buffered chan
fmt.Println("processing...", session.MemberID(), string(m.Key))
<-wait
fmt.Println("finish msg", session.MemberID(), string(m.Key))
case <-claim.StopConsuming():
fmt.Println("stopping by stopConsuming - inside", session.MemberID(), string(m.Key))
return nil
}
fmt.Println("end of process message", session.MemberID(), string(m.Key))
case <-claim.StopConsuming():
fmt.Println("stopping by stopConsuming - outside", session.MemberID())
return nil
}
}
}
Note: since our messages takes up to 1 minute to process we need to increase some timeout so everything is ok. I'll expose them here to help someone with similar issues:
func ConfigsByProcessingTime(maxProcessingTime time.Duration) *sarama.Config {
conf := sarama.NewConfig()
conf.Version = sarama.V2_6_0_0
conf.Net.MaxOpenRequests = 5
conf.Consumer.Return.Errors = true
conf.Consumer.Offsets.Initial = sarama.OffsetOldest
const partitions = 10
conf.ChannelBufferSize = 0
conf.Consumer.MaxProcessingTime = maxProcessingTime * partitions
conf.Consumer.Group.Rebalance.Timeout = (maxProcessingTime * partitions) + 5*time.Second
conf.Net.ReadTimeout = (maxProcessingTime * partitions) + 5*time.Second
conf.Net.WriteTimeout = (maxProcessingTime * partitions) + 5*time.Second
conf.Net.DialTimeout = (maxProcessingTime * partitions) + 5*time.Second
return conf
}
Hey guys, I just ran into the same problem and I thought you can check sess.Context().Done() to known if you should stop the consumer
func (h *consumerGroupHandler) ConsumeClaim(sess sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
wait := make(chan error)
for msg := range claim.Messages() {
var m broker.Message
if err := h.kopts.Codec.Unmarshal(msg.Value, &m); err != nil {
continue
}
go func() {
err := h.handler(&publication{
m: &m,
t: msg.Topic,
km: msg,
cg: h.cg,
sess: sess,
})
select {
case <-sess.Context().Done():
case wait <- err:
}
}()
select {
case <-sess.Context().Done():
break
case err := <-wait:
if err == nil && h.subopts.AutoAck {
sess.MarkMessage(msg, "")
}
}
}
return nil
}
https://github.com/Shopify/sarama/issues/1897#issuecomment-922495227
@pavius thanks for this excellent write up. For some reason I hadn't been aware of this issue until I stumbled upon it just now. I've been browsing over your changes via the old nuclio/nuclio vendor dir and your iguazio/sarama fork and I think they look reasonable. If you'd be willing to put together separate PRs for the three cases you identified that would be great. Particularly point 3. seems a quick and easy bugfix that we should get merged sooner rather than later, and your proposed change for 2. look reasonable to me, although I'd like to exercise it with some unittesting and sustained load somewhere myself to be sure.
Is there any new update for issue 1?
Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.
The suggested changes were merged into Sarama. Closing this issue as resolved