monix-kafka
monix-kafka copied to clipboard
Manual commitAsync completes before actual commit
In KafkaCOnsumerObservableManualCommit.scala:56
there's incorrect implementation of async commit
override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
Task {
blocking(consumer.synchronized(consumer.commitAsync(batch.map {
case (k, v) => k -> new OffsetAndMetadata(v)
}.asJava, callback)))
}
Apache kafka commitAsync(offsets, callback)
returns immediately and invokes callback when commit completes
this task completes when consumer.commitAsync
returns, not when callback invoked and also ignores possible commit errors also passed through callback, it should probably be rewritten to Task.async
Good catch @voidconductor ! Will fix it before the next release
I've recently tried to fix this, and it turned out to be harder than I thought.
Looks like apache kafka client puts callbacks passed to commitAsync
to some internal queue and invokes them on consumer's poll
.
So naive solution with Task.create
doesn't even pass tests, probably because poll
won't be called before commit callback completes and lets observable to run on
In this case, I think we could either include a comment about it or return a Fiber
which can be joined.
I assume we can't cancel commit so perhaps even Task[Task[Unit]]
instead of Task[Fiber]