fs2-kafka
fs2-kafka copied to clipboard
Stale consumers after JVM shutdown
Our application runs on 2 Kubernetes pods and uses fs2-kafka for several consumers; most of the time we have 8 consumers in total per topic, 4 consumers on each pod, every consumer subscribed to its own partition. When our pods are scaled down and up again, i.e. when our application is restarted, we often find that for one topic, some 'zombie' consumers are still hanging around on the broker after the restart. For this topic we see a mix of new consumers from both restarted pods, plus a few consumers from one pod from before the restart. At least, that's what the broker shows, in reality the old pod has long died and those consumers don't exist anymore. However since the broker is unaware of this, the stale consumers are still assigned to partitions, which means that new messages from those partitions are not consumed anymore and a lag builds up.
The topic where this happens is not always the same, also sometimes a restart doesn't give any problems. And the workaround is simply to restart our pods once more, usually the stale consumers are gone after that.
We suspect that the consumer might not have unsubscribed properly during JVM shutdown, but this is just a guess: we don't do any unsubscribing by ourselves, we completely rely on fs2-kafka for this. We usually subscribe like this:
KafkaConsumer
.stream(consumerSettings)
.evalTap(_.subscribeTo(topic))
.flatMap(_.stream)
.evalTap(x => log(show"Received ${x.record.value} from $topic", logLevel))
.groupWithin(groupWithin.messages, groupWithin.duration)
.evalMap(processRecords)
.evalMap(CommittableOffsetBatch.fromFoldable(_).commit.whenA(commitOffset))
and we don't have any code to explicitly shutdown or unsubscribe upon JVM exit.
Could it be that we are not using fs2-kafka in the correct way, and that this is causing the random stale consumers? Or is this perhaps a known issue?