KAFKA-17480: New consumer commit all consumed should retrieve offsets in background thread
When committing all consumed offsets (sync, async, or on close), the new consumer retrieves the offsets from subscriptionState.allConsumed() in the app thread. We should consider to retrieve the allConsumed in the background when processing the events, to avoid inconsistencies given that the subscription state could be modified in the background thread since the moment the allConsumed was retrieved in the app thread.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Hey @FrankYang0529, thanks for taking this one! I see it is marked as Draft still so haven't looked into it but let me know when ready for review and I'll do. Thanks!
Hi @lianetm, yes, this one is ready for reviewing. Thank you.
Hi @lianetm and @kirktrue, thanks for the review. I may need more time for this. I found that AsyncKafkaConsumer#interceptors consume offsets. If we use background thread to get subscriptions.allConsumed(), we need CommitEvent#future to return offsets, so AsyncKafkaConsumer#interceptors doesn't get empty offsets.
Hi @lianetm and @kirktrue, I addressed all comments. The PR is ready to review now. Thanks.
Hi @lianetm, thanks for the review. I address all comments with following update:
- Move
ConsumerUtils#maybeUpdateLastSeenEpochIfNewertoApplicationEventProcessor#maybeUpdateLastSeenEpochIfNewerand do all metadata update in the background thread. - Add
testFetchCommittedOffsetsEventtoApplicationEventProcessorTest. - When
autoCommitEnabledistrue, usingOptional.emptyforcommitSyncfunction. UpdateAsyncKafkaConsumerTest#testAutoCommitSyncEnabledto checkSyncCommitEvent#offsetsis not present. - Update default auto commit enabled to false in
AsyncKafkaConsumerTest. Before this PR, we also wait for SyncCommitEvent and we didn't mock complete SyncCommitEvent, but previous test result didn't be blocked onclosefunction by the event. If the default auto commit enabled is true and the test case callsclosefunction evidently, it may be blocked by SyncCommitEvent and wait for 30 seconds. I'm not sure whether the root cause is that we change the future result fromVoidtoMap<TopicPartition, OffsetAndMetadata>.
Hi @lianetm / @kirktrue, thanks for the review and suggestion. I do following change:
- Move offsets option checking and
maybeUpdateLastSeenEpochIfNewertoCommitRequestManager. - Move related test cases from
ApplicationEventProcessorTesttoCommitRequestManagerTest. - Remove unnecessary
completeCommitSyncApplicationEventSuccessfullyfromAsyncKafkaConsumerTest. - Update
testAutoCommitSyncDisabledto callcloseexplicitly. - Change
testAutoCommitSyncEnabledtotestCommitSyncAllConsumed.
Hi @lianetm, thanks for the review. I addressed all comments and rebased trunk.
Thanks for your review and patience. 🙏