fs2-kafka
fs2-kafka copied to clipboard
Stuck partitions - question about the pause/resume logic
Hey.
We have a topic with 50 partitions, with consumers scaled out to a relatively small number of hosts (3). Sporadically, we get a situation where one particular partition gets stuck and does not commit any new offsets. As soon as we restart the hosts, it kicks back into life and immediately catches up.
We see this in the logs very frequently after it becomes stuck:
Skipping fetching records for assigned partition <mypartition> because it is paused
The last couple of FS2 Kafka logs mentioning the stuck partition, before it was stuck, say this:
Completed fetches with records for partitions [ <mypartition> -> { first: 45677489, last: 45677495 }, .... ]
Followed by logs that never mention the partition again:
Current state [State(fetches = Map(... a number of other partitions, but not <mypartition> ... ), ...)]
In trying to figure out what's going on, I ended up looking at this bit of the code:
def pollConsumer(state: State[F, K, V]): F[ConsumerRecords] =
withConsumer
.blocking { consumer =>
val assigned = consumer.assignment.toSet
val requested = state.fetches.keySetStrict
val available = state.records.keySetStrict
val resume = (requested intersect assigned) diff available
val pause = assigned diff resume
if (pause.nonEmpty)
consumer.pause(pause.asJava)
if (resume.nonEmpty)
consumer.resume(resume.asJava)
consumer.poll(pollTimeout)
}
.flatMap(records)
Filling in all the possible combinations, I think we get these possible outcomes:
I've highlighted the state that I think we're in, and, given that the java kafka library is saying it will skip fetching for paused partitions, I'm a little confused how the partition is then expected to resume.
If I pull down the FS2 Kafka codebase, and replace the intersect
with union
the KafkaConsumerSpec tests still pass, and it would cause our highlighted scenario to be resumed (i think), but I don't know what the consequences of that would be!
It feels like this should be a more widespread issue given how central this code is, and how long it's been like this for, so I bet I'm missing something.
Thanks
Just curious because we are having a similar issue, which version of fs2-kafka are you on? We are on 2.6.1, wanted to know if this was also happening with 3.x
Just curious because we are having a similar issue, which version of fs2-kafka are you on? We are on 2.6.1, wanted to know if this was also happening with 3.x
Just witnessed the issue on v3.0.1
Just curious because we are having a similar issue, which version of fs2-kafka are you on? We are on 2.6.1, wanted to know if this was also happening with 3.x
Just witnessed the issue on v3.0.1
Thanks for the reply. That's unfortunate. I was hoping upgrading would be sufficient.
We operate an internal service that runs a few thousand streams and we have also experienced the "lost partition" issue. The issue seems to be a race condition, and happens reliably for topics with higher volume of traffic: we see a couple of "partition lost" events every day in topics with higher volume of traffic. We haven't so far identified the source of this race condition.
After spending some time trying to identify the root cause, we settled on a workaround where we monitor consumer offset lag for each stream, and recreate/restart consumers that get stuck for a few minutes. This worked out well in practice, and allowed us to ignore this issue for a while.
Mainly triggered by the reporting of the current issue, as I was trying to gather some data to share, and getting ready to dive once more in search for the race condition, a colleague pointed to some comments (I don't have a reference handy) that relate the lost partition issue to the use of Stream.groupWithin
method from fs2.
Are you using groupWithin
in your stream as a mechanism to chunk and aggregate messages based on time?
We ran some experiments in our staging environment, removing groupWithin
from a stream that would usually trigger the lost partition issue, and we have not seen it happen after a couple of days, so it is looking like a good candidate to look into for the race condition. I still need to spend some time reading the groupWithin
code more closely, to be able to say this more definitely and/or propose a fix.
From our side, we already have a stream that mixes clock ticks and message chunks so we're looking at replacing groupWithin
altogether with a simpler implementation based on the ticks. For our use case, it'll be cheaper to do this based on a shared tick stream, as we have, versus individual timeout decisions.
Thank you for the reply. Unfortunately no use of groupWithin
here.
Do you use commitBatchWithin
? It is also based on groupWithin
.
Yes! I will look into this, thank you for the lead
Thanks for the hint @biochimia.
Since we seemed to have encountered this issue when moving from FS2 Kafka 2 to 3, I've just been doing some sleuthing. This change was made to groupWithin a couple of years ago. Before that was this issue which has an interesting comment from @SystemFw
groupWithin is push based with back pressure of 1, so very close to being purely pull based but not quite
I’m not convinced I understand how behaviour around commits would prevent fetching new records though, my naive model has them as entirely separate "threads", and committing offsets just minimizes the amount of records you re-process when starting up a stream again - you could in theory run a stream for hours/days not making any commits.
Anyway… I was thinking about a similar workaround that involves these steps:
- Detect a partition stream that is idle (this post provides an interesting solution: https://medium.com/@ivovk/fs2-streams-detecting-stream-idleness-aec12af06936)
- See if there is lag on that partition (can use
kafkaConsumer.committed
, but also requires the broker offset - via the Admin client?) - If there isn't any lag, keep going and restart the timer basically (thinking this could be fairly long like 30 mins)
- Otherwise, restart the stream - at the moment i'm guessing the simplest thing here would be to restart the parent stream (and restart all partitions on this host).
Regarding fs2.Stream.groupWithin
:
- There's a (relatively) recent fix upstream, https://github.com/typelevel/fs2/pull/3183. It was released as part of fs2 3.8.0. I haven't been able to test it properly.
- Related to the above fix, there are more
groupWithin
changes in the pipeline in https://github.com/typelevel/fs2/pull/3186. At the moment it seems to be work in progress. - I dug up another seemingly related issue from
cats-effect
, https://github.com/typelevel/cats-effect/issues/3392. A "lost fiber bug" was identified and addressed in https://github.com/typelevel/cats-effect/pull/3444. This particular fix was released as part of cats-effect 3.4.8. I also haven't been able to test this properly. - If you read the comments on that last PR, there's mention that there may still be some issue lurking related to
groupWithin
, even after the fix: https://github.com/typelevel/cats-effect/pull/3444#issuecomment-1473690997
Tangentially, the release notes for cats-effect 3.5.0 mention an interface/behaviour change where the legacy behaviour could "result in an effect which could be canceled unconditionally, without the invocation of any finalizer". There's no smoking gun or indication that this addresses the issues we have seen, but somehow it seems like it could be related. We're looking into testing this too, but for us it involves updates of various dependencies across various components. https://github.com/typelevel/cats-effect/releases/tag/v3.5.0
- About detecting idleness, our approach has been to merge a stream of clock ticks, so in the end we have a stream of
TickOrChunk
and we can alternate time-based actions and message processing in the same stream. This also gives us a point to flush intermediate buffers—not entirely unlikegroupWithin
.
I don't think we tried the stream.pull.timed
approach, but it also looks interesting.
- To check for lag we're using
consumer.endOffsets
, directly; this also based on our observations that the consumer remains operational when we experience lost partitions (e.g., no rebalances triggered, and other consumers do not take over the partition). We then compare the end offsets against our last processed offsets and a timestamp associated with the processing. You can get fancy on the rules here, like checking that the consumer has failed to make progress for a while before deciding to restart it. - Our checks run every 15 minutes, they might only restart a stuck partition after 30 minutes or so.
- On restarting the stream, we take the approach of recreating everything from the consumer up through the stream. We do this by raising an error that is only caught at the point where we instantiate the stream in the first place. We have some machinery around this to give us a graceful exit path when we do want to shutdown the application.
Unfortunately, our current workaround is getting entangled in the problem as we are seeing some errored streams getting stuck and not being able to recover. This may be more related to the cancelation/finaliser changes in cats-effect 3.5, and changes being made by various libraries to adapt to it...
Anyway, at this point I'm happy to share the information we have in case it helps someone make a connection.
2. consumer.endOffsets
Thanks. Wasn't aware of this one.
An update from our side:
- we have updated our application to the latest versions of cats-effect stack: cats 2.10.0, cats-effect 3.5.1, fs2 3.8.0, and fs2-kafka 3.0.1, among other packages. (I'm also aware fs2 just released a couple of updates, at least the release notes for those don't suggest an impact here)
- we have removed our use of
groupWithin
in favour of our custom implementation that makes use of a tick-stream (which we also use for other purposes)
Unfortunately, we still see dropped partitions with the updated stack. The only good news is that we seem to be reliably restarting those again, so the updated dependencies seem to have addressed the issue where our restart mechanism would get itself stuck. This could be related to the fix in https://github.com/typelevel/cats-effect/pull/3444.
I can't rule out that there are other bugs in our code and in the stack, but perhaps the main trigger here could be CPU starvation or a similar bad behaviour by our application such as "expensive" tasks that work against the cats-effect scheduler.
At this moment, I no longer have an indication about a specific construct being the trigger for this issue.
Update from us. We tried implementing the timed pull approach (described here: https://medium.com/@ivovk/fs2-streams-detecting-stream-idleness-aec12af06936).
Unfortunately, this didn't work. It appears that when the partition becomes "stuck" the stream stops being pulled (as our timer never fires again).
Hey. Just as an update, we've not seen stuck partitions for a month now. We have since updated various libraries (Cats, FS2, FS2-Kafka) so our assumption is one of these fixed the issue - or it was due to some issue at the broker side which has since been resolved.
That's awesome to hear. Just curious, which version of Kafka is your broker on? Would love to be able to compare once we do some upgrades.
Just to say, we've had reoccurrences of this. Luckily, they're rare, and mostly in non-prod environments, but still. @jarrodcodes our Kafka broker version is 3.4 (soon to be 3.5 apparently, so will be interested to see if that changes anything)
I'm having the same issue. Chunks pulled from kafka reports as size 0 after some time. It is not occuring in our test env with 8 partitions/4 consumers, but is happening quite frequently in production on 24 partitions/4 consumer replicas. Metrics are reported downstream in forked threads (based on partitionsMapStream
), but pulled chunks are as I've said empty. I'm on broker 3.3 and fs2-kafka 3.2.0.
I've read the whole thread and I must say that this is really unfortunate 😞 I mean the fact that you folks have been workaround this by restarting the consumers...
I'd like to investigate this bug in depth. Just to be sure, these partitions stuck happen without a rebalance, am I right? If that's the case, it will be easier to trace the issue 🙏🏽
Thanks @aartigao, in our case this mostly happens in test environments at some point after a deployment (we've actually moved to using sticky partition assignor to mitigate other issues, but doesn't seem to prevent this issue).
What we see happening is some time after a deployment, when some messages finally start arriving, the lag starts going up - and never goes down. Usually only for one of the partitions (we're consuming from one topic with 50 partitions), sometimes more.
As described above if we simply restart the services it comes back to life.