KAFKA-14562 [2/2]: Implement epoch bump after every transaction (KIP-890)
This patch includes changes to the clients end transaction response handling when transaction version 2 is enabled. Version 5+ of the End Txn Response includes the producer Id and the producer epoch fields.
Upon receiving the request, the client updates its producer Id and epoch according to the response.
On receiving an EndTxnRequest the server would've either:
- Bumped the epoch for the given producer ID.
- On epoch overflow, sent a new producer Id with epoch 0.
This patch also includes changes to the endTxnRequest to send the right request version based on whether txnV2 is enabled.
PS: I'm working on addressing the failures in the integration tests in a follow up. This is just for the first pass.
I think we should also disable the epoch bump logic on the client (search for epochBumpRequired) because the abort would do epoch bump anyway.
This would only be if TV2 is used right? It might not be as simple as removing everything.
This would only be if TV2 is used right? It might not be as simple as removing everything.
Yes, basically do if (!v2Enabled) epochBumpRequired = true in the 3 cases where it's done for transactional producer. +add a comment explaining epochBumpRequired logic because now it has 3 cases: idempotent producers, transactional v1 (need to bump epoch from the client to fence off zombies from the transaction to be aborted), v2 bumps epoch automatically, so zombies will get fenced without explicit bump from the client.
Can we look at these failures:
FAILED ❌ TransactionsExpirationTest > "testBumpTransactionalEpochAfterInvalidProducerIdMapping(String).quorum=kraft"
[1764](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1767)
FAILED ❌ TransactionsExpirationTest > "testTransactionAfterProducerIdExpires(String).quorum=kraft"
[1765](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1768)
FAILED ❌ RemoteLogManagerTest > testFetchOffsetByTimestampWithTieredStorageDoesNotFetchIndexWhenExistsLocally()
[1766](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1769)
FAILED ❌ ProducerIdExpirationTest > "testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft"
[1767](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1770)
FAILED ❌ TransactionManagerTest > testAbortTransactionAndResetSequenceNumberOnUnknownProducerId()
[1768](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1771)
FAILED ❌ TransactionManagerTest > testAbortTransactionAndReuseSequenceNumberOnError()
While rerunning the test that had issues -- I noticed this one is flaky
"testFailureToFenceEpoch(String, boolean).quorum=kraft, isTV2Enabled=true"
should we change something here? I saw this same failure before.