kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-14562 [2/2]: Implement epoch bump after every transaction (KIP-890)

Open rreddy-22 opened this issue 1 year ago • 3 comments

This patch includes changes to the clients end transaction response handling when transaction version 2 is enabled. Version 5+ of the End Txn Response includes the producer Id and the producer epoch fields.

Upon receiving the request, the client updates its producer Id and epoch according to the response.

On receiving an EndTxnRequest the server would've either:

  1. Bumped the epoch for the given producer ID.
  2. On epoch overflow, sent a new producer Id with epoch 0.

This patch also includes changes to the endTxnRequest to send the right request version based on whether txnV2 is enabled.

PS: I'm working on addressing the failures in the integration tests in a follow up. This is just for the first pass.

rreddy-22 avatar Oct 08 '24 01:10 rreddy-22

I think we should also disable the epoch bump logic on the client (search for epochBumpRequired) because the abort would do epoch bump anyway.

This would only be if TV2 is used right? It might not be as simple as removing everything.

jolshan avatar Oct 09 '24 22:10 jolshan

This would only be if TV2 is used right? It might not be as simple as removing everything.

Yes, basically do if (!v2Enabled) epochBumpRequired = true in the 3 cases where it's done for transactional producer. +add a comment explaining epochBumpRequired logic because now it has 3 cases: idempotent producers, transactional v1 (need to bump epoch from the client to fence off zombies from the transaction to be aborted), v2 bumps epoch automatically, so zombies will get fenced without explicit bump from the client.

artemlivshits avatar Oct 09 '24 22:10 artemlivshits

Can we look at these failures:

FAILED ❌ TransactionsExpirationTest > "testBumpTransactionalEpochAfterInvalidProducerIdMapping(String).quorum=kraft"
[1764](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1767)
FAILED ❌ TransactionsExpirationTest > "testTransactionAfterProducerIdExpires(String).quorum=kraft"
[1765](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1768)
FAILED ❌ RemoteLogManagerTest > testFetchOffsetByTimestampWithTieredStorageDoesNotFetchIndexWhenExistsLocally()
[1766](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1769)
FAILED ❌ ProducerIdExpirationTest > "testTransactionAfterTransactionIdExpiresButProducerIdRemains(String).quorum=kraft"
[1767](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1770)
FAILED ❌ TransactionManagerTest > testAbortTransactionAndResetSequenceNumberOnUnknownProducerId()
[1768](https://github.com/apache/kafka/actions/runs/11394823613/job/31706184534?pr=17402#step:10:1771)
FAILED ❌ TransactionManagerTest > testAbortTransactionAndReuseSequenceNumberOnError()

jolshan avatar Oct 18 '24 16:10 jolshan

While rerunning the test that had issues -- I noticed this one is flaky

"testFailureToFenceEpoch(String, boolean).quorum=kraft, isTV2Enabled=true"

should we change something here? I saw this same failure before.

jolshan avatar Oct 30 '24 21:10 jolshan