monix-kafka icon indicating copy to clipboard operation
monix-kafka copied to clipboard

Manual commitAsync completes before actual commit

Open voidconductor opened this issue 5 years ago • 3 comments

In KafkaCOnsumerObservableManualCommit.scala:56 there's incorrect implementation of async commit

override def commitBatchAsync(batch: Map[TopicPartition, Long], callback: OffsetCommitCallback): Task[Unit] =
  Task {
    blocking(consumer.synchronized(consumer.commitAsync(batch.map {
      case (k, v) => k -> new OffsetAndMetadata(v)
     }.asJava, callback)))
  }

Apache kafka commitAsync(offsets, callback) returns immediately and invokes callback when commit completes

this task completes when consumer.commitAsync returns, not when callback invoked and also ignores possible commit errors also passed through callback, it should probably be rewritten to Task.async

voidconductor avatar Aug 01 '19 19:08 voidconductor

Good catch @voidconductor ! Will fix it before the next release

Avasil avatar Aug 01 '19 21:08 Avasil

I've recently tried to fix this, and it turned out to be harder than I thought.

Looks like apache kafka client puts callbacks passed to commitAsync to some internal queue and invokes them on consumer's poll.

So naive solution with Task.create doesn't even pass tests, probably because poll won't be called before commit callback completes and lets observable to run on

voidconductor avatar Aug 19 '19 19:08 voidconductor

In this case, I think we could either include a comment about it or return a Fiber which can be joined.

I assume we can't cancel commit so perhaps even Task[Task[Unit]] instead of Task[Fiber]

Avasil avatar Aug 22 '19 10:08 Avasil