alpakka-kafka icon indicating copy to clipboard operation
alpakka-kafka copied to clipboard

CommittableOffsetBatchImpl allow reordering

Open Matzz opened this issue 4 years ago • 5 comments

Short description

Commiting older offset after newer in CommittableOffsetBatchImpl will override it.

Details

There two main problems I would like to discuss here:

  1. Commit older offset after newer using CommittableOffsetBatchImpl will override offset. Example:
import akka.kafka.ConsumerMessage.CommittableOffsetBatch
import akka.kafka.testkit.ConsumerResultFactory

object Main extends App {
  def makeCommittableOffset(offset: Long) = {
    ConsumerResultFactory.committableOffset(
      groupId = "x",
      topic = "test",
      partition = 1,
      offset = offset,
      metadata = ""
    )
  }
  var batch = CommittableOffsetBatch.empty
  batch = batch.updated(makeCommittableOffset(1))
  batch = batch.updated(makeCommittableOffset(3))
  batch = batch.updated(makeCommittableOffset(4))
  batch = batch.updated(makeCommittableOffset(2))
  println(batch)
  // CommittableOffsetBatch(batchSize=4, GroupTopicPartition(x,test,1) -> 2)
}

In my opinion there should be an option to log an error or throw an exception in such case. At the first glance, it affects only at most once delivery. However, this behavior could be a problem for low traffic topics. We will observe a lag in metrics until we will receive next message on that partition. And sometimes it could take days. Such problems are very hard to debug. In my case my code was faulty due to using flatMapMerge instead of flatMapConcat. I understand that CommittableOffsetBatch may not catch problems between independent batches. But I think logging such issues within a the same batch is possible and beneficial. Of course it could be configurable and optional.

  1. Skipping some offsets:
  var batch = CommittableOffsetBatch.empty
  batch = batch.updated(makeCommittableOffset(1))
  batch = batch.updated(makeCommittableOffset(2))
  batch = batch.updated(makeCommittableOffset(3))
  batch = batch.updated(makeCommittableOffset(20))
  println(batch)
 // CommittableOffsetBatch(batchSize=4, GroupTopicPartition(x,test,1) -> 2)

I'm not entirely sure about that. But I think for at least once delivery it should be possible to log in such case (optional)

I could implement that feature however I that expected behavior should be discussed first.

Regards, Matzz

Matzz avatar Dec 13 '20 00:12 Matzz

Thank you for discussing this here.

Reordering of messages and thus committing offsets wrong causes nasty bugs and Alpakka Kafka can get better at avoiding it.

For case 1. I agree that we should stop lowering the offset for a topic partition right in the batch implementation (note that there is logic to avoid committing lower offsets in CommitObservationLogic.updateBatchForPartition). Logging a warning might be a good idea. Passing configuration to the CommittableBatch implementation is not straight-forward, so I'm not sure it would be worth it.

For case 2. I wouldn't want to change anything, in many scenarios it is valid to just filter out messages (and offsets).

ennru avatar Dec 14 '20 08:12 ennru

@ennru For both scenarios I thought that it could be configurable. So by default I would disable warnings for scenario 2. I think it could be implemented by adding reorderingEvent listener to CommittableOffsetBatch. So beside predefined behaviors user could set some custom actions (for example metrics). I was thinking about changing CommittableOffsetBatch.empty and CommitObservationLogic via settings which internally use empty.

Other approach would be just adding logging in CommittableOffsetBatch and CommitObservationLogic.updateBatch for the first scenario.

Matzz avatar Dec 18 '20 03:12 Matzz

If you're willing to try this out that would be great.

ennru avatar Dec 22 '20 13:12 ennru

+1 for this. I experienced some issues with this where offsets would never reach 0 because of the use of mapAsyncUnordered. After switching to mapAsync the messages are produced in order and therefore the offsets are committed correctly so they reach 0. While this is more "correct", the throughput of the pipeline lowers quite significantly by this. It is mostly a esthetic issue as long as the pipeline keeps running, but when it is restarted it'll lead to duplication.

I wouldn't want to start a discussion of what correct behavior for this is, but it would be great if Alpakka would at least warn that offsets are commited in the wrong order and this may produce undesired / surprising results.

EgbertW avatar Jan 06 '21 13:01 EgbertW

it would be great if Alpakka would at least warn that offsets are commited in the wrong order and this may produce undesired / surprising results.

Agreed, I created #1305 to track this specifically. @EgbertW if you're willing we'd welcome a PR!

Regarding @Matzz 2nd request I don't see the value warning the user of "skipped" offset commits since it's a common practice to batch or skip records during processing.

seglo avatar Jan 06 '21 18:01 seglo