aiokafka
aiokafka copied to clipboard
[QUESTION] Rebalance during transaction breaks exactly-once semantics?
I try to understand how to implement exactly-once guarantees with the transactional consume-process-produce example.
I use the example almost verbatim. I've added a few print statements and added a sleep to simulate a long processing duration.
...
print("Start sleep")
sleep(4)
print("Done sleep. Start commit")
await producer.send_offsets_to_transaction(
commit_offsets, GROUP_ID)
print("Commit done")
...
Besides this, I added a subscribe()
call with my custom ConsumerRebalanceListener
to see what's happening.
class MyRebalanceListener(ConsumerRebalanceListener):
def on_partitions_assigned(self, partitions):
print("Assigned:", sorted([p.partition for p in partitions]))
def on_partitions_revoked(self, partitions):
print("Revoked:", sorted([p.partition for p in partitions]))
My understanding is that the send_offsets_to_transaction()
should fail if it tries to commit to partitions whose assignment was revoked. To stress test this, I'm adding a worker when the current worker is sleeping, i.e after the output was send but before the offsets were committed. I see log output that indicates
(Processing of event)
Start sleep
Done sleep. Start commit
Heartbeat failed for group processing-group because it is rebalancing
Revoked: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
Assigned: [0, 2, 4, 6, 8]
Commit done
So it seems that the commit succeeded after new partitions were assigned. In the target topic I see duplicated messages, one produced by each worker.
Where does this go wrong? Why doesn't it give me exactly-once guarantees? I can provide the runnable example if this makes it more clear. Shouldn't the transaction fail/be aborted if a rebalance happens in the background?
Was someone able to reproduce this? Am I wrong to assume exactly-once semantics?
After reading Improved Robustness and Usability of Exactly-Once Semantics in Apache Kafka, I'm now convinced that this is not implemented in aiokafka
. Initially, exactly-once semantics required static partition assignments. With KIP-447, dynamic assignments were made possible. To fence against zombie producers (basically what I stage with the example above), the sendOffsetsToTransaction
call needs to receive GroupMetadata
which includes the generationId
. However, in aiokafka
, only the group id is used, which is not enough to fence against commits after the rebalance.
CMIIW, if exactly-once semantics are strictly required with dynamic partition assignments, the only option at the moment is the Java implementation.
@sauerburger As I understood, the zombie fencing is managed by the "transactional id" settings that is not really explained in the aiokafka example. On my implementation of an exactly-once semantics of aiokafka, I did create a producer pool, that is implementing ConsumerRebalanceListener
so I am creating a producer with a transactional id specific to an input topic-partition. This way, I am expecting the "zombie" producer to get a rejection while submitting the "commit" message to the transaction topics.
With my loop, I am getting aiokafka.errors.ProducerFenced: ProducerFenced: There is a newer producer using the same transactional_id ortransaction timeout occurred (check that processing time is below transaction_timeout_ms)
with a similar example