kafka
kafka copied to clipboard
KAFKA-14053: Transactional producer should bump the epoch and skip ab…
…orting when a delivery timeout is encountered
When a transactional batch encounters delivery or request timeout, it can still be in-flight. In this situation, if the transaction is aborted, the abort marker might get appended to the log earlier than the in-flight batch. This can cause the LSO of a partition to be blocked infinitely, or can violate the processing guarantees. To avoid this situation, on a client side timeout, the transactional producer should skip aborting (EndTxnRequest), and bump the epoch instead. Since this is a fencing bump, the producer cannot safely continue, resulting in a fatal error.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
@hachikuji would you please review this PR as well?
@showuon @artemlivshits Can you please take a look at this PR? This is the issue we had a thread about on the dev list.
I'll review it this week. Sorry for the delay.
@artemlivshits @ijuma @hachikuji Can you please take a look at this PR? Trying to fix a bug in the transactional producer. Thanks in advance!
@dajac @hachikuji Any chance you can take a look at this? This is a painful issue in transactional producers, with some serious consequences (partition corruption).
@urbandan
If it is better to keep the producer in a usable state, I can give it a shot. I had one experiment in which I tried keeping the producer usable by increasing the epoch on the client side once. I believe that it is safe to do as the fencing bump will increase the epoch, and the coordinator will never return that to any clients
I think it would be possible to avoid a fatal state, but it would require a client-side epoch bump. When an IniPid is sent during an ongoing transaction, the coordinator bumps the producer epoch to fence off the current producer. This bumped epoch is never returned to any producers as a valid epoch. This never-exposed epoch could be used by the producer to stay in a usable state.
In short: epoch=0 -> delivery timeout occurs -> send fencing InitPid -> epoch=1 (on coordinator side) -> increase epoch on client side -> send another InitPid -> safely acquire epoch=2 Since epoch=1 will never be used by another producer, this is a safe operation, and an actual fencing operation (by another producer instance) can be detected.
Can you elaborate a bit more on this idea? Is this the implementation in the PR now, or was an idea to avoid the fatal error?
@jolshan It is an idea, the first version of the PR was trying to implement that, but the current state of the PR is based on the fatal state.
The idea about keeping the producer in a reusable state is kind of tricky. The issue is that to fix the bug, we need to bump the epoch instead of aborting. Normally, an epoch bump results in a successful response from the coordinator, which contains the increased epoch, which then can be safely used by the producer to keep working. But bumping an epoch during an ongoing transaction is handled differently, because the coordinator assumes that a producer fencing occurred (a new producer instance with the same transaction id started up). Because of this, the response to the bump does not contain an actual epoch - it kicks off the fencing operation, and tells the producer to keep retrying until the fencing operation is finished. When that is done, the coordinator will increase the epoch again, and return it to the new producer. An important observation here is that there is an epoch which is never returned to any producers by the coordinator. We could rely on this fact by trying to use this "hidden" epoch, by increasing the epoch on the client side. Then we can try to bump the epoch again with this "hidden" epoch. If there were no other producer instances fencing off the current producer, this will succeed, and we will get an increased epoch from the broker, meaning that the producer can safely continue. If there was another producer instance fencing of the current instance, even this "hidden" epoch will be fenced anyway.
In short, as I wrote in the other thread: epoch=0 -> delivery timeout occurs -> send fencing InitPid with epoch=0 -> epoch=1 (on coordinator side) -> increase epoch on client side epoch=1 -> send another InitPid with epoch=1 -> safely acquire epoch=2
Thanks for all the discussion here and sorry for the late arrival. I have seen this issue in practice as well, often in the context of hanging transactions. The late-arriving Produce
request is not expected by the transaction coordinator. Unless the producer is lingering around to continue writing to the transaction, then it is considered hanging by the partition leader. It's also fair to point out that this can violate the transaction's atomicity.
I think the basic idea in the patch here is to bump the epoch when we abort a transaction in order to fence off writes that are in inflight. Do I have that right?
This is in the spirit of an idea that's been on my mind for a while. The only difference is that I was considering a server-side implementation. The basic thought is to have the coordinator bump the epoch after every EndTxn
request. We would let the bumped epoch be returned in the response.
EndTxnResponse => ThrottleTimeMs ErrorCode ProducerId ProducerEpoch
The tuple of (producerId, epoch)
effectively becomes a unique transaction ID. This would also simplify some of the sequence bookkeeping that we've had so much trouble with on the client. Each transaction would begin with sequence=0 on every partition and the client could "forget" about the inflight requests. Some of the logic we have struggled to get right is how to continue the sequence chain
There is still a hole, however, which I think @jolshan was describing above. We cannot assume clients will always add partitions correctly to the transaction before beginning to write to the partition. We need a server-side validation. Otherwise, hanging transactions will always be possible. We have seen this so many times by now.
My suggestion here is to let us get a KIP out in the couple weeks with a good server-side solution. We may still need a client-side approach for compatibility with older brokers though, so maybe we can leave the PR open.
@hachikuji Thanks for the feedback. Yes, the essence of the change is bumping the epoch - but only in case of timeouts - so delivery timeout or request timeout.
Overall I agree that a server-side solution might be safer, and I'm interested in the KIP. At the same time, it sounds like a pretty big overhaul of the transaction coordination flow, and as you mentioned, will only help in new broker versions. What I was aiming to achieve with this change was to provide a bugfix which could be even backported to older versions. Do you think it would make sense to move forward with this fix in the meantime?
I'm not sure if the fix addresses the following scenario:
- producer got a timeout (and there is a delayed produce that got stuck)
- producer crashes before having an opportunity to bump epoch or anything
- transaction coordinator auto-aborts transaction
- delayed produce gets unstuck and delivered on top of abort
@artemlivshits The scenario you mentioned is already covered, even without this change - when a transaction times out, the transaction coordinator bumps the epoch, so it already fences off the "stuck" produce request.
@urbandan By the way, KIP-890 is now available to review 😄 https://cwiki.apache.org/confluence/display/KAFKA/KIP-890%3A+Transactions+Server-Side+Defense