zio-kafka
zio-kafka copied to clipboard
Partition lost not recovering, possible issue with `RunLoop`
This might be related to the https://github.com/zio/zio-kafka/issues/1233 issue, but the last couple of weeks / months we see issues where after a partition is lost, it isn't recovering correctly. We've tried to analyze or debug it, but this occurs so infrequently that we haven't been able to isolate it.
By analyzing the code we might have identified the reason, but there is so much async stuff happening there, that we might be interpreting stuff wrongly.
What happens in our case is the following:
- Services are consuming from n partitions. Note that these are partitions for a topic that has intervals with little to no traffic.
- At a certain point, usually in the middle of the night, there is some network issue where kafka-client identifies the partitions as lost.
- We see these lost partitions in the logging, but partitions aren't being recovered, and we also see no errors in our own client code that the streams have failed.
For partitions that are revoked everything seems to be working correctly though.
What we see as possible cause for this is this. In the Runloop
this happens for lost partitions:
onLost = lostTps =>
for {
_ <- ZIO.logDebug(s"${lostTps.size} partitions are lost")
rebalanceEvent <- lastRebalanceEvent.get
state <- currentStateRef.get
lostStreams = state.assignedStreams.filter(control => lostTps.contains(control.tp))
_ <- ZIO.foreachDiscard(lostStreams)(_.lost)
_ <- lastRebalanceEvent.set(rebalanceEvent.onLost(lostTps))
_ <- ZIO.logTrace(s"onLost done")
} yield ()
Resulting in this call in the PartitionStreamControl
:
private[internal] def lost: UIO[Boolean] = {
val lostException = new RuntimeException(s"Partition ${tp.toString} was lost") with NoStackTrace
interruptionPromise.fail(lostException)
}
Looking at the way the interruptionPromise
is handled this doesn't seem to work correctly when there are no records to be processed. In PartitionStreamControl
we've got this repeating effect:
ZStream.repeatZIOChunk {
// First try to take all records that are available right now.
// When no data is available, request more data and await its arrival.
dataQueue.takeAll.flatMap(data => if (data.isEmpty) requestAndAwaitData else ZIO.succeed(data))
}.flattenTake.chunksWith { s =>
s.tap(records => registerPull(queueInfo, records))
// Due to https://github.com/zio/zio/issues/8515 we cannot use Zstream.interruptWhen.
.mapZIO(chunk => interruptionPromise.await.whenZIO(interruptionPromise.isDone).as(chunk))
}
And here the interruptionPromise
is checked to see if we need to interrupt this effect. But, how would this work if there are no active chunks to process? The requestAndAawaitData
function:
requestAndAwaitData =
for {
_ <- commandQueue.offer(RunloopCommand.Request(tp))
_ <- diagnostics.emit(DiagnosticEvent.Request(tp))
taken <- dataQueue.takeBetween(1, Int.MaxValue)
} yield taken
Blocks the current fiber until at least 1 element is taken. So when the lost
function fails the promise, that promise is never checked, since there are no records coming in on the dataQueue
(or I'm reading stuff wrong here, which is of course also possible).
For the revoke flow, the dataQueue
gets an additional Take.end), to get out of the
requestAndAwaitDatawait state. But that doesn't happen for the
lost` scenario.
So, shouldn't the code for lost also make sure the dataQueue at least gets some value, since it seems to be stuck in the requestAndAwaitData
loop indefinitely.