alpakka-kafka
alpakka-kafka copied to clipboard
CommittingProducerSinkStageLogic incorrectly counts awaitingCommitResult
Versions used
Akka version: 2.6.19 Alpakka Kafka version: 3.0.1
Expected Behavior
awaitingCommitResult
should be incremented and subtracted by the same value to become zero eventually and allow clean shutdown.
In produce
awaitingCommitResult
should be incremented by msg.passThrough.batchSize
and multiMsg.passThrough.batchSize
correspondingly.
Actual Behavior
During the shutdown CommittingProducerSinkStageLogic
awaits not finished commits by checking awaitingCommitResult
is zero.
In produce
this value is always incremented by 1 when single message is committed using Committable
and when many messages are committed by CommittableOffsetBatch
.
In commitResultCB
on the other hand batchSize
is subtracted. This leads awaitingCommitResult
to be always negative.
As a result during the shutdown it cannot terminate correctly.
In my case I'm aggregating incoming messages by 100 using groupedWithin
operator. Next from all grouped messages I'm building one CommittableOffsetBatch
. CommittingProducerSinkStageLogic
increments awaitingCommitResult
by 1 for every batch. All batches are aggregated in even bigger batch. When such batch of size 400-500 is committed awaitingCommitResult
is subtracted by the size of the batch.
I agree, it looks like as if I mixed the counters for awaitingProduceResult
and awaitingCommitResult
in the callback. This should be reproducible in a test case.