monix-kafka
monix-kafka copied to clipboard
Detecting Consumer Failures
I have a case with an unpredictable delay in the processing time of messages, following Kafka consumer documentation I modified KafkaConsumerObservable.runLoop as follows:
def runLoop(consumer: KafkaConsumer[K, V]): Task[Unit] = {
val ackTask: Task[Ack] = Task.unsafeCreate { (context, cb) =>
implicit val s = context.scheduler
s.executeAsync { () =>
context.frameRef.reset()
val ackFuture =
try consumer.synchronized {
if (context.connection.isCanceled) Stop
else {
val next = blocking(consumer.poll(pollTimeoutMillis))
// Pasue partition
blocking(consumer.pause(consumer.assignment()))
Observer.feed(out, next.asScala)(out.scheduler)
}
} catch {
case NonFatal(ex) =>
Future.failed(ex)
}
ackFuture.syncOnComplete {
case Success(ack) =>
var streamErrors = true
try consumer.synchronized {
if (context.connection.isCanceled) {
streamErrors = false
cb.asyncOnSuccess(Stop)
} else {
// Resume partition and commit offset
consumer.resume(consumer.assignment())
consumerCommit(consumer)
streamErrors = false
cb.asyncOnSuccess(ack)
}
} catch {
case NonFatal(ex) =>
if (streamErrors) cb.asyncOnError(ex)
else s.reportFailure(ex)
}
case Failure(ex) =>
cb.asyncOnError(ex)
}
}
}
ackTask.flatMap {
case Stop => Task.unit
case Continue => runLoop(consumer)
}
}
Is possible to handle this use case without modify the exposed observable ? Is worth it to create a PR for this case ?
Thanks!
@jedossa Can you elaborate a bit more on the justification of these changes?
I just want to explore an option for use cases where message processing time varies unpredictably. In these cases -according to Kafka docs- is recommended to move message processing to another thread, continue calling poll, disable automatic commits and pause the partition. But I am not 100% sure that this changes are the solution for this use cases.