Brighter
Brighter copied to clipboard
Kafka Consumer Offset not committed, only committed after service restarts
Discussed in https://github.com/BrighterCommand/Brighter/discussions/2263
Originally posted by honkuan86 September 9, 2022 Hi all,
We have noticed an issue when consuming message from Kafka using Brighter, where the offsets are not committed until a service restarts.
I'm using the settings bellow:
new KafkaSubscription<UserSessionEvent>( new SubscriptionName("user.session.subscription"), channelName: new ChannelName("user.session.channel"), routingKey: new RoutingKey(messageBusConfig.SubscriptionTopics.UserSessionEvents), groupId: "task-service.group", timeoutInMilliseconds: 100, offsetDefault: AutoOffsetReset.Earliest, isAsync: true, commitBatchSize:15, noOfPerformers: 3, sweepUncommittedOffsetsIntervalMs: 5000)
Note: The service is hosted in AWS EKS and consuming from a AWS MSK Cluster
The image below shows the logs after the service restarts, from the image it shows that all uncommitted offsets are committed at the same time, and after awhile it will starts slowing down, until a second restart, then it will commit all uncommitted offsets again.
Hi @iancooper , may I know if you know about this behavior ? Or did I doing the configuration wrong ?
Hi @honkuan86. That is a little disturbing. : The offsets should be committed when:
- You hit the batch size
- An timeout triggers a "no offsets committed, let's commit what we have"
- Partition changes
- Shutdown
So, a failure to commit them prior to shtudown is worrying.
You could try lowering the batch size, to see what happens i.e. lower it to about 5. (Lowering to 1 maybe counter productive).
A couple of things to try (individually) to see their impact:
- Batch Size: Lower this to 5 for a while, see if a reduced batch size triggers the commit earlier
- NoOfPerformers: Drop this to 1. I need to think about what increasing the noofperformers means for Brighter with Kafka; we would have to allocate each performer a different partitions, and have the part the same consumer group
BTW
- Async: this will de-order your stream, as you cannot guarantee it will be processed in order; unless you know you need the throughput consider Sync
I may have time later to check our demo to see nothing has broken
BTW, @honkuan86 if you are running under EKS I would always tend to set NoOfPerformers to 1 and increase the number of pods that I ran, with multiple pods in the consumer group if you had multiple partitions. It is far easier to manage a pod than a thread - scale out over scale up - our scale out option is mostly for VMs over running in a container.
Two other thing @honkuan86:
- We made an update in 9.1.2 to the way that we initialize logging. Can you make sure that you are on 9.1.2 and turn your logs to Debug, that should mean we know for sure what is happening
- Use KafkaCat to check your offsets. Just in case the output is fooling us.
Hi @iancooper , some updates from me after a round of testing. This behavior seems to be only happen when there is a huge number of messages involved, we are currently running a load test that sends few hundred thousands of messages to a Topic in Kafka, and we want to test the consumers.
So I've tried the settings below:
new KafkaSubscription<UserSessionEvent>( new SubscriptionName("user.session.subscription"), channelName: new ChannelName("user.session.channel"), routingKey: new RoutingKey(messageBusConfig.SubscriptionTopics.UserSessionEvents), groupId: "task-service.group", timeoutInMilliseconds: 100, offsetDefault: AutoOffsetReset.Earliest, isAsync: true, commitBatchSize:5, noOfPerformers: 1, sweepUncommittedOffsetsIntervalMs: 5000)
As you can see I've set the noOfPerformers to 1 and commitBatchSize to 5. It is still behave the same (offset not committed), then I tried to add more pods, I tested with 7 pods, the result is still the same.
From what I've observed, the offsets will be committed when a service restarts or when a new pod / consumer added to the consumer group which trigger the rebalance of the consumer.
I monitor the Kafka messages using the kafka-ui and the results shows as below:
From the image the task-service.group consumers are built using Brighter, where the other 2 services are built using Java. 3 consumer groups are consuming from a same topic, however the number of message behind is having a huge gap.
Note: I'm using v9.1.19 of Brighter.
Hi @honkuan86 -
Right. So it may be worth making sure you have reviewed this: https://brightercommand.gitbook.io/paramore-brighter-documentation/brighter-configuration/kafkaconfiguration#offset-management
It is possible at high volume that because we only run one commit thread at a time, you will fall behind on exceptionally high throughput streams. For those, you need to increase the batch size.
Think of it this way: in the time taken to commit a batch, more messages arrive, which will be uncommitted. As we serialize commits, if each one does not commit enough you can get into a scenario where more commits arrive whilst you are committing this batch, that you will commit next batch. So if it takes 100ms to commit, and you commit 10, but 100 arrive in that same 100 ms, you won't keep pace.
To fix that you may find you need to set a much higher batch size. Try upping the batch size so that it can keep pace with the rate of arrival. It is a bit of a trade-off, longer commit for a larger batch, but clears more of the backlog.
It's possible that in a bad case the commit thread is never being scheduled, because the OS doesn't schedule it's thread due to the constant pressure on the main thread. We'd have to ponder that, if it was the case.
I may have some time in the next couple of days to run a load test like that, but it is unlikely today. So you may have more information.
But if it is able to read a topic under a low load, and then stops reading at high load, I would look to the serialization of batch commits being the problem
@honkuan86 Any more clarity
Hi @iancooper , the load test we are running 500 req/sec throughput (Api requests that will publish messages to Kafka). So for this reason I tried to set the commitBatchSize to 500 with the following settings:
new KafkaSubscription<UserSessionEvent>( new SubscriptionName("user.session.subscription"), channelName: new ChannelName("user.session.channel"), routingKey: new RoutingKey(messageBusConfig.SubscriptionTopics.UserSessionEvents), groupId: "task-service.group", timeoutInMilliseconds: 1000, offsetDefault: AutoOffsetReset.Earliest, isAsync: true, commitBatchSize:500, noOfPerformers: 1)
However, it is still no luck, the offsets are still not getting commit fast enough, only will be clear when it restarts. I'm suspecting it is due to the threadpool is too busy to handle the messages and many background threads are waiting to be executed.
@honkuan86 When we scale a stream we do via partitioning, not competing consumers. Each partition has a single thread reading it, usually a single process. Kafka polls, so you get backpressure, the rate you read from your partition should be a rate your thread can handle.
Crude calculation: if you have 500 req/sec, and if it took you 250ms to process a request or 4 requests a second, you would need on the order of 125 partitions. However, arrival rates are not uniform and often form a Poisson distribution, so you might need closer to 150 or so.
Then you would need a consumer for each partition, so 150 consumers in the consumer group reading those partitions. You have some overhead, so you can probably skip worrying about a consumer in the group having to read two partitions for a period whilst a consumer fails and restarts?
So you need to know two key things:
- How long, on average, does it take you to process a request
- How many requests do you get per second
That will tell you how many partitions, and by implication consumers you need to read something fast enough to keep pace.
Make sense?
PS
If you have 500 req/sec, and you had 7 consumers (as above) and we assume 7 partitions therefore, you would need to process each request in 14ms to keep pace with your arrival rate.
Just so you can see the issue
Hi @iancooper
We are facing the same issue, here is our setting (using Brighter v9.1.19)
new KafkaSubscription<NotificationCommand>(
new SubscriptionName("icsa.notification.subscription"),
channelName: new ChannelName("icsa.notification.channel"),
routingKey: new RoutingKey("icsa.notification.command"),
groupId: AppDomain.CurrentDomain.FriendlyName,
timeoutInMilliseconds: 1000,
offsetDefault: AutoOffsetReset.Earliest,
commitBatchSize: 5,
isAsync: true)
We have a total of 160 messages in a topic and commitBatchSize set to 5. Although we have processed all the messages, the consumer group still shows 4 messages behind Kafka UI.
After few round of testing, here is the log result from the console. The offset already hit the batch size (5) but it didn't commit to 166.
May I know any problem with the KafkaSubscription configuration?
@honkuan86 Let me know if the partitions change helped you.
@phuaxinyuin If you have an amount of records below the batch size left, then you will find the 'at the the batch size' trigger for committing offsets won't run. However, any records on the topic can be stopped from sitting there by setting your SweepUncommittedOffsetsIntervalMs value on the subscription. That will run if no batch commit is running and store any outstanding offsets.
Both, I probably need to set up a load test for our Kafka transport and ensure that nothing goes wrong (or at least document any scale limits). That will take me a little bit of time.
So I have been running the Brighter Kafka sample, which spits messages out continuously.
- The offsets seem to be committed.
- The messages "behind" figure will rise up to the batch size, but this would be expected as we don't commit until the batch interval, and it then falls down again after the offsets commit
So far, running on a single partition, and a new message arrives every 2s. I'll look to increase the frequency first, in jumps. Later I'll play with partitions.


OK. I do note, if I shut down the generator with a partial batch, it remains uncommitted prior to exit. I'll see what the defaults for the sweeper are as well.
@honkuan86 @phuaxinyuin OK, I think we have a repro and a fix incoming. It is a threading issue with the usage of a Monitor, that is being exited on a different thread to where it is being entered. We are switching to a SemaphoreSlim, which is what we use elsewhere for that.
Just testing and I will get a PR in, some eyes on it, and a release out asap
Raised a PR #2294
Hi @honkuan86 and @phuaxinyuin
9.1.20 has the fix for the bug that you reported. If one of you could test it and give me any feedback necessary, so we can think about closing this issue if it is fixed.
Hi @honkuan86 and @phuaxinyuin
I wondered if you had any feedback. Looking to close this issue if it is fixed.
Thanks,
Ian
Hi @honkuan86 and @phuaxinyuin
I wondered if you had any feedback. Looking to close this issue if it is fixed.
Thanks,
Ian
Hi @iancooper , I've done the testing on behalf of @phuaxinyuin by using the latest version. I noticed that the offset of the consumer group will only updated by "sweepUncommittedOffsetsIntervalMs" if there are new messages continuously published into the topic, but the "sweep" not running if there is no new messaging going into topic.
Test Scenario 1 : No. of Messages in Topic1= 157, CommitBatchSize = 18, sweepUncommittedOffsetsIntervalMs = 10000, no message to be generated along the test. Result: Consumer Group shows 4 messages behind , although the application is still running in background.
Test Scenario 2 : No. of Messages in Topic1= 157, CommitBatchSize = 18, sweepUncommittedOffsetsIntervalMs = 10000, New messages pushing into the topic every 30s Result: Messages behind of Consumer Group updated to 0.
Hi @iancooper, I've updated Brighter to v9.1.20 to test. Below are the details:
Subscriptions:
new KafkaSubscription<UserSessionEvent>( new SubscriptionName("user.session.subscription"), channelName: new ChannelName("user.session.channel"), routingKey: new RoutingKey(messageBusConfig.SubscriptionTopics.UserSessionEvents), groupId: "task-service.group", timeoutInMilliseconds: 500, offsetDefault: AutoOffsetReset.Earliest, isAsync: true, commitBatchSize:400, noOfPerformers: 1)
Throughput: 350 req /sec Partitions: 236 No of Consumer (Pods): 15 Note: We wrote the handler that does nothing, just return base.HandleAsync()
The result is that the consumer offset commit cannot keep up. Not much difference from the previous version from what I've observed.
I will test with 15 partitions with 15 consumers (pods) tomorrow.
I have a question is that, if it is a problem that the consumers can't keep up with the rate of the messages arriving, then,
- Why all the offset being committed after restarts ? (if it is slow, then it should be slow even after restart ?)
- What else can we do to keep up (since the handler already done nothing other than return base.HandleAsync())
If you have 15 pods and 236 partitions, then you have about 16 partitions per pod. If you are struggling to keep up, that is because you can't process those 16 partitions fast enough.
Don't forget a performer is a single thread. So you would need 16 performers per pod to have a dedicated thread for each partition. If they share a consumer group they should get allocated different partitions.
Now personally, I would up the number of pods as in K8s a pod running multiple threads never tends to perform that well. Your problem is that each of those 16 threads possibly schedules one other thread: a commit thread or a sweeper, so you may have up to 32 threads running. More if you have any async. That is a lot for a CPU. It's a lot for a container running in a pod too.
So I still think you are under-resourced for what you want to do.
IMO we fixed the issue with failing to write commits.
If you set an interval for the max time a record can sit in Kafka, the sweeper will also help, as it will run every x seconds and clear anything up to now.
Other things we could look at:
- We don't queue commits. So if we have a commit in progress, we won't make another one. To fix that, we could queue requests to commit with a fixed list of the offsets we will commit when that batch is updated. This would conflict with the sweeper however, so that might require thought.
The only other option would be for us to enable you to turn on auto-commit instead of using our batch approach. At your throughput it might be better. Your risk is that if you fail to process a record, then you will still have committed the offset, but it would push the commits back onto Kafka
Another way of looking at this: 236 partitions and 16 threads would require processing each item in 40ms.
PS @honkuan86 if you turn up your logs to Information you should see it flushing the buffer. You may need to pipe your logs somewhere to see that, probably scroll past at your rate. If you are not then it doesn't seem to be able to schedule the thread that flushes the offsets.
I'll see if I can crank my test up fast enough, and with enough partitions to break it.
An option might be to allow you to set auto-commit and trade off the possibility of not processing a message against throughput.
Hi @iancooper, today I've run the tests again with Sync pipeline (isAsync = false), the result is that the consumer is able to keep up with offsets committed to Kafka. Same setup as pervious tests, however, I've only use one pod (one consumer) for the test.
If we turn off Async and the consumer able to keep up with only one pod, does that mean something is not right with Async pipeline ?
@honkuan86 There may be an interaction with your load and either our synchronization context or our offset management that we need to look at.
However, we don't recommend the async pipeline in most service activator scenarios. Why? Because we have a single-threaded message pump that is designed to preserve ordering. If you use an async pipeline, you will de-order your stream because you will not necessarily process them in order due to async. With a Kafka stream, you tend to want to read sequentially, and just increase the number of partitions (and instances or performers) to get throughput. In addition, at high load, I would expect async to become counter-productive to performance as the context switching will be more costly than the I/O blocking.
We maintain a synchronization context so that if you want thread affinity between the main thread and the callback, we can offer you that. I guess that could have an issue under high load, we would need to test.
But useful, as it gives us something to dive into, and see if we can figure out stronger advice on, or find a defect
@phuaxinyuin Did you get any feedback?
Closing this one, as assuming it works from no further feedback @honkuan86 we will look separately at why the async pipeline struggles at load