fs2-kafka icon indicating copy to clipboard operation
fs2-kafka copied to clipboard

Stuck partitions - question about the pause/resume logic

Open simonpetty opened this issue 1 year ago • 20 comments

Hey.

We have a topic with 50 partitions, with consumers scaled out to a relatively small number of hosts (3). Sporadically, we get a situation where one particular partition gets stuck and does not commit any new offsets. As soon as we restart the hosts, it kicks back into life and immediately catches up.

We see this in the logs very frequently after it becomes stuck:

Skipping fetching records for assigned partition <mypartition> because it is paused

The last couple of FS2 Kafka logs mentioning the stuck partition, before it was stuck, say this:

Completed fetches with records for partitions [ <mypartition> -> { first: 45677489, last: 45677495 }, .... ]

Followed by logs that never mention the partition again:

Current state [State(fetches = Map(... a number of other partitions, but not <mypartition> ... ), ...)]

In trying to figure out what's going on, I ended up looking at this bit of the code:

def pollConsumer(state: State[F, K, V]): F[ConsumerRecords] =
      withConsumer
        .blocking { consumer =>
          val assigned = consumer.assignment.toSet
          val requested = state.fetches.keySetStrict
          val available = state.records.keySetStrict

          val resume = (requested intersect assigned) diff available
          val pause = assigned diff resume

          if (pause.nonEmpty)
            consumer.pause(pause.asJava)

          if (resume.nonEmpty)
            consumer.resume(resume.asJava)

          consumer.poll(pollTimeout)
        }
        .flatMap(records)

Filling in all the possible combinations, I think we get these possible outcomes:

Screenshot 2023-08-10 at 09 01 54

I've highlighted the state that I think we're in, and, given that the java kafka library is saying it will skip fetching for paused partitions, I'm a little confused how the partition is then expected to resume.

If I pull down the FS2 Kafka codebase, and replace the intersect with union the KafkaConsumerSpec tests still pass, and it would cause our highlighted scenario to be resumed (i think), but I don't know what the consequences of that would be!

It feels like this should be a more widespread issue given how central this code is, and how long it's been like this for, so I bet I'm missing something.

Thanks

simonpetty avatar Aug 10 '23 08:08 simonpetty

Just curious because we are having a similar issue, which version of fs2-kafka are you on? We are on 2.6.1, wanted to know if this was also happening with 3.x

jarrodcodes avatar Aug 24 '23 21:08 jarrodcodes

Just curious because we are having a similar issue, which version of fs2-kafka are you on? We are on 2.6.1, wanted to know if this was also happening with 3.x

Just witnessed the issue on v3.0.1

tpalmer99 avatar Aug 25 '23 11:08 tpalmer99

Just curious because we are having a similar issue, which version of fs2-kafka are you on? We are on 2.6.1, wanted to know if this was also happening with 3.x

Just witnessed the issue on v3.0.1

Thanks for the reply. That's unfortunate. I was hoping upgrading would be sufficient.

jarrodcodes avatar Aug 25 '23 15:08 jarrodcodes

We operate an internal service that runs a few thousand streams and we have also experienced the "lost partition" issue. The issue seems to be a race condition, and happens reliably for topics with higher volume of traffic: we see a couple of "partition lost" events every day in topics with higher volume of traffic. We haven't so far identified the source of this race condition.

After spending some time trying to identify the root cause, we settled on a workaround where we monitor consumer offset lag for each stream, and recreate/restart consumers that get stuck for a few minutes. This worked out well in practice, and allowed us to ignore this issue for a while.

Mainly triggered by the reporting of the current issue, as I was trying to gather some data to share, and getting ready to dive once more in search for the race condition, a colleague pointed to some comments (I don't have a reference handy) that relate the lost partition issue to the use of Stream.groupWithin method from fs2.

Are you using groupWithin in your stream as a mechanism to chunk and aggregate messages based on time?

We ran some experiments in our staging environment, removing groupWithin from a stream that would usually trigger the lost partition issue, and we have not seen it happen after a couple of days, so it is looking like a good candidate to look into for the race condition. I still need to spend some time reading the groupWithin code more closely, to be able to say this more definitely and/or propose a fix.

From our side, we already have a stream that mixes clock ticks and message chunks so we're looking at replacing groupWithin altogether with a simpler implementation based on the ticks. For our use case, it'll be cheaper to do this based on a shared tick stream, as we have, versus individual timeout decisions.

biochimia avatar Aug 25 '23 15:08 biochimia

Thank you for the reply. Unfortunately no use of groupWithin here.

jarrodcodes avatar Aug 25 '23 18:08 jarrodcodes

Do you use commitBatchWithin? It is also based on groupWithin.

biochimia avatar Aug 28 '23 08:08 biochimia

Yes! I will look into this, thank you for the lead

jarrodcodes avatar Aug 28 '23 14:08 jarrodcodes

Thanks for the hint @biochimia.

Since we seemed to have encountered this issue when moving from FS2 Kafka 2 to 3, I've just been doing some sleuthing. This change was made to groupWithin a couple of years ago. Before that was this issue which has an interesting comment from @SystemFw

groupWithin is push based with back pressure of 1, so very close to being purely pull based but not quite

I’m not convinced I understand how behaviour around commits would prevent fetching new records though, my naive model has them as entirely separate "threads", and committing offsets just minimizes the amount of records you re-process when starting up a stream again - you could in theory run a stream for hours/days not making any commits.

Anyway… I was thinking about a similar workaround that involves these steps:

  1. Detect a partition stream that is idle (this post provides an interesting solution: https://medium.com/@ivovk/fs2-streams-detecting-stream-idleness-aec12af06936)
  2. See if there is lag on that partition (can use kafkaConsumer.committed, but also requires the broker offset - via the Admin client?)
  3. If there isn't any lag, keep going and restart the timer basically (thinking this could be fairly long like 30 mins)
  4. Otherwise, restart the stream - at the moment i'm guessing the simplest thing here would be to restart the parent stream (and restart all partitions on this host).

simonpetty avatar Aug 30 '23 16:08 simonpetty

Regarding fs2.Stream.groupWithin:

  • There's a (relatively) recent fix upstream, https://github.com/typelevel/fs2/pull/3183. It was released as part of fs2 3.8.0. I haven't been able to test it properly.
  • Related to the above fix, there are more groupWithin changes in the pipeline in https://github.com/typelevel/fs2/pull/3186. At the moment it seems to be work in progress.
  • I dug up another seemingly related issue from cats-effect, https://github.com/typelevel/cats-effect/issues/3392. A "lost fiber bug" was identified and addressed in https://github.com/typelevel/cats-effect/pull/3444. This particular fix was released as part of cats-effect 3.4.8. I also haven't been able to test this properly.
  • If you read the comments on that last PR, there's mention that there may still be some issue lurking related to groupWithin, even after the fix: https://github.com/typelevel/cats-effect/pull/3444#issuecomment-1473690997

Tangentially, the release notes for cats-effect 3.5.0 mention an interface/behaviour change where the legacy behaviour could "result in an effect which could be canceled unconditionally, without the invocation of any finalizer". There's no smoking gun or indication that this addresses the issues we have seen, but somehow it seems like it could be related. We're looking into testing this too, but for us it involves updates of various dependencies across various components. https://github.com/typelevel/cats-effect/releases/tag/v3.5.0

biochimia avatar Aug 30 '23 17:08 biochimia

  1. About detecting idleness, our approach has been to merge a stream of clock ticks, so in the end we have a stream of TickOrChunk and we can alternate time-based actions and message processing in the same stream. This also gives us a point to flush intermediate buffers—not entirely unlike groupWithin.

I don't think we tried the stream.pull.timed approach, but it also looks interesting.

  1. To check for lag we're using consumer.endOffsets, directly; this also based on our observations that the consumer remains operational when we experience lost partitions (e.g., no rebalances triggered, and other consumers do not take over the partition). We then compare the end offsets against our last processed offsets and a timestamp associated with the processing. You can get fancy on the rules here, like checking that the consumer has failed to make progress for a while before deciding to restart it.
  2. Our checks run every 15 minutes, they might only restart a stuck partition after 30 minutes or so.
  3. On restarting the stream, we take the approach of recreating everything from the consumer up through the stream. We do this by raising an error that is only caught at the point where we instantiate the stream in the first place. We have some machinery around this to give us a graceful exit path when we do want to shutdown the application.

biochimia avatar Aug 30 '23 17:08 biochimia

Unfortunately, our current workaround is getting entangled in the problem as we are seeing some errored streams getting stuck and not being able to recover. This may be more related to the cancelation/finaliser changes in cats-effect 3.5, and changes being made by various libraries to adapt to it...

Anyway, at this point I'm happy to share the information we have in case it helps someone make a connection.

biochimia avatar Aug 30 '23 17:08 biochimia

2. consumer.endOffsets

Thanks. Wasn't aware of this one.

simonpetty avatar Aug 30 '23 19:08 simonpetty

An update from our side:

  • we have updated our application to the latest versions of cats-effect stack: cats 2.10.0, cats-effect 3.5.1, fs2 3.8.0, and fs2-kafka 3.0.1, among other packages. (I'm also aware fs2 just released a couple of updates, at least the release notes for those don't suggest an impact here)
  • we have removed our use of groupWithin in favour of our custom implementation that makes use of a tick-stream (which we also use for other purposes)

Unfortunately, we still see dropped partitions with the updated stack. The only good news is that we seem to be reliably restarting those again, so the updated dependencies seem to have addressed the issue where our restart mechanism would get itself stuck. This could be related to the fix in https://github.com/typelevel/cats-effect/pull/3444.

I can't rule out that there are other bugs in our code and in the stack, but perhaps the main trigger here could be CPU starvation or a similar bad behaviour by our application such as "expensive" tasks that work against the cats-effect scheduler.

At this moment, I no longer have an indication about a specific construct being the trigger for this issue.

biochimia avatar Sep 01 '23 09:09 biochimia

Update from us. We tried implementing the timed pull approach (described here: https://medium.com/@ivovk/fs2-streams-detecting-stream-idleness-aec12af06936).

Unfortunately, this didn't work. It appears that when the partition becomes "stuck" the stream stops being pulled (as our timer never fires again).

simonpetty avatar Sep 13 '23 12:09 simonpetty

Hey. Just as an update, we've not seen stuck partitions for a month now. We have since updated various libraries (Cats, FS2, FS2-Kafka) so our assumption is one of these fixed the issue - or it was due to some issue at the broker side which has since been resolved.

simonpetty avatar Nov 01 '23 11:11 simonpetty

That's awesome to hear. Just curious, which version of Kafka is your broker on? Would love to be able to compare once we do some upgrades.

jarrodcodes avatar Nov 06 '23 16:11 jarrodcodes

Just to say, we've had reoccurrences of this. Luckily, they're rare, and mostly in non-prod environments, but still. @jarrodcodes our Kafka broker version is 3.4 (soon to be 3.5 apparently, so will be interested to see if that changes anything)

simonpetty avatar Dec 11 '23 12:12 simonpetty

I'm having the same issue. Chunks pulled from kafka reports as size 0 after some time. It is not occuring in our test env with 8 partitions/4 consumers, but is happening quite frequently in production on 24 partitions/4 consumer replicas. Metrics are reported downstream in forked threads (based on partitionsMapStream), but pulled chunks are as I've said empty. I'm on broker 3.3 and fs2-kafka 3.2.0.

kareblak avatar Jan 07 '24 10:01 kareblak

I've read the whole thread and I must say that this is really unfortunate 😞 I mean the fact that you folks have been workaround this by restarting the consumers...

I'd like to investigate this bug in depth. Just to be sure, these partitions stuck happen without a rebalance, am I right? If that's the case, it will be easier to trace the issue 🙏🏽

aartigao avatar Feb 26 '24 22:02 aartigao

Thanks @aartigao, in our case this mostly happens in test environments at some point after a deployment (we've actually moved to using sticky partition assignor to mitigate other issues, but doesn't seem to prevent this issue).

What we see happening is some time after a deployment, when some messages finally start arriving, the lag starts going up - and never goes down. Usually only for one of the partitions (we're consuming from one topic with 50 partitions), sometimes more.

As described above if we simply restart the services it comes back to life.

simonpetty avatar Apr 19 '24 15:04 simonpetty