KAFKA-15931: Reopen TransactionIndex if channel is closed
Cached TransactionIndex may get closed if interrupted, causing following calls to always fail with ClosedChannelException, and forcing process to be restarted. In order to avoid this issue, a new method is exposed by TransactionIndex to validate state of channel; and index is reopened if closed.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Bump! @satishd @kamalcph can we get this review going? Thanks!
Thanks for the patch @jeqo!
I was able to reproduce the exception mentioned in the ticket by calling close in-middle of collecting the aborted transactions. The fix doesn't resolve the issue. Could you please write a unit test to ensure that the issue gets resolved?
https://github.com/apache/kafka/blob/trunk/storage/src/main/java/org/apache/kafka/storage/internals/log/TransactionIndex.java#L177
org.apache.kafka.common.KafkaException: Failed read position from the transaction index /var/folders/bq/w6tnkbq964q8sqpvj4fbmjr40000gq/T/kafka9502115127128961380.tmp
at org.apache.kafka.storage.internals.log.TransactionIndex$1.hasNext(TransactionIndex.java:245)
at org.apache.kafka.storage.internals.log.TransactionIndex.collectAbortedTxns(TransactionIndex.java:176)
at kafka.log.TransactionIndexTest.testCollectAbortedTxnsDuringClose(TransactionIndexTest.scala:210)
at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
...
Caused by: java.nio.channels.ClosedChannelException
at java.base/sun.nio.ch.FileChannelImpl.ensureOpen(FileChannelImpl.java:150)
at java.base/sun.nio.ch.FileChannelImpl.position(FileChannelImpl.java:325)
at org.apache.kafka.storage.internals.log.TransactionIndex$1.hasNext(TransactionIndex.java:243)
... 86 more
Once the txn-index is closed, the next call to collectAbortedTxns will return empty result. This patch addresses it by re-opening the index. I think we can ignore this exception as the consumer will get error result and retries the FETCH request.
causing following calls to always fail with ClosedChannelException
Is this correct? If the channel is closed, then empty result is returned.
@jeqo , could you address @kamalcph 's comment above? Thanks.
@kamalcph good catch! You're right. I have refactored the code a bit further to reopen the txn index properly. PTAL
@jeqo
Any updates on this PR?
@kamalcph sorry for the delay. I have take a step back on the approach to resolve this issue, and now have it solved directly on the TransactionIndex by definition: the file is source of truth, expose the channel for read/write ops. If the channel is closed, then reopen it. Lmk if the new approach makes sense, thanks!
Thanks @showuon. That's a fair point.
The main issue this PR was trying to address was the fact that the cached Entry was left with a TransactionIndex closed, hence the reopening solution.
I wonder if the race condition may have been already fixed while this PR has been in review--because there is no obvious call to .close() where we can follow an investigation.
For instance, isn't https://github.com/apache/kafka/commit/88d2c4460a1c8c8cf5dbcc9edb43f42fe898ca00 actually solving the race condition? Myguess is that by using the "wrong" lock the close was not blocking reads from the cache, hence the bug. And seems this commit already fixes this on 3.7 -- @divijvaidya wonder if you could confirm?
In that case, I could remove the channel reopening and leave this as a refactoring PR if that's useful. Thanks!
We have received feedback of this issue persisting on 3.8, so I guess we could have another look at this PR.
@showuon I have made the TransactionIndexFile channel access methods synchronized. Given that this is not an index expected to be concurrently accessed by many threads, this approach should be fine. Also, got to know1 that if the thread accessing the channel is interrupted, the channel will get closed implicitly. I think this is the main cause, as interruption are the way to handle delayed fetch operations. I have added a test to validate the channel gets reopen after interruption. PTAL.
We were the ones discussing with @jeqo - the "caching closed channels" issue was happening regularly for us on 3.8 in production, the thread doing a remote read was interrupted while iterating through the transaction index, we get a ClosedByInterruptException on some transaction index file channel, but the closed channel remains in the cache. The only way to mitigate was restarting the broker.
We were able to reproduce by setting a low remote.fetch.max.wait.ms and setting a small segment size in order to generate many transaction index files.
We tested that this PR fixes our repro and cherry-picked it onto our production release, we haven't seen the issue since then.
We haven't seen the "race during channel close" issue (which is now also handled in this PR) in production.
The patch LGTM. It needs second set of eyes as we are re-opening a closed index. cc @showuon @satishd PTAL.
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
@jeqo , could you try to rebase to the latest trunk? I'd like to verify the change doesn't break the build. Thanks.
@showuon thanks! build looks good after rebase 👍🏽