kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-17480: New consumer commit all consumed should retrieve offsets in background thread

Open FrankYang0529 opened this issue 1 year ago • 4 comments

When committing all consumed offsets (sync, async, or on close), the new consumer retrieves the offsets from subscriptionState.allConsumed() in the app thread. We should consider to retrieve the allConsumed in the background when processing the events, to avoid inconsistencies given that the subscription state could be modified in the background thread since the moment the allConsumed was retrieved in the app thread.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

FrankYang0529 avatar Sep 10 '24 11:09 FrankYang0529

Hey @FrankYang0529, thanks for taking this one! I see it is marked as Draft still so haven't looked into it but let me know when ready for review and I'll do. Thanks!

lianetm avatar Sep 13 '24 13:09 lianetm

Hi @lianetm, yes, this one is ready for reviewing. Thank you.

FrankYang0529 avatar Sep 17 '24 02:09 FrankYang0529

Hi @lianetm and @kirktrue, thanks for the review. I may need more time for this. I found that AsyncKafkaConsumer#interceptors consume offsets. If we use background thread to get subscriptions.allConsumed(), we need CommitEvent#future to return offsets, so AsyncKafkaConsumer#interceptors doesn't get empty offsets.

FrankYang0529 avatar Sep 27 '24 14:09 FrankYang0529

Hi @lianetm and @kirktrue, I addressed all comments. The PR is ready to review now. Thanks.

FrankYang0529 avatar Oct 07 '24 09:10 FrankYang0529

Hi @lianetm, thanks for the review. I address all comments with following update:

  • Move ConsumerUtils#maybeUpdateLastSeenEpochIfNewer to ApplicationEventProcessor#maybeUpdateLastSeenEpochIfNewer and do all metadata update in the background thread.
  • Add testFetchCommittedOffsetsEvent to ApplicationEventProcessorTest.
  • When autoCommitEnabled is true, using Optional.empty for commitSync function. Update AsyncKafkaConsumerTest#testAutoCommitSyncEnabled to check SyncCommitEvent#offsets is not present.
  • Update default auto commit enabled to false in AsyncKafkaConsumerTest. Before this PR, we also wait for SyncCommitEvent and we didn't mock complete SyncCommitEvent, but previous test result didn't be blocked on close function by the event. If the default auto commit enabled is true and the test case calls close function evidently, it may be blocked by SyncCommitEvent and wait for 30 seconds. I'm not sure whether the root cause is that we change the future result from Void to Map<TopicPartition, OffsetAndMetadata>.

FrankYang0529 avatar Oct 30 '24 14:10 FrankYang0529

Hi @lianetm / @kirktrue, thanks for the review and suggestion. I do following change:

  • Move offsets option checking and maybeUpdateLastSeenEpochIfNewer to CommitRequestManager.
  • Move related test cases from ApplicationEventProcessorTest to CommitRequestManagerTest.
  • Remove unnecessary completeCommitSyncApplicationEventSuccessfully from AsyncKafkaConsumerTest.
  • Update testAutoCommitSyncDisabled to call close explicitly.
  • Change testAutoCommitSyncEnabled to testCommitSyncAllConsumed.

FrankYang0529 avatar Nov 03 '24 14:11 FrankYang0529

Hi @lianetm, thanks for the review. I addressed all comments and rebased trunk.

FrankYang0529 avatar Nov 06 '24 16:11 FrankYang0529

Thanks for your review and patience. 🙏

FrankYang0529 avatar Nov 07 '24 14:11 FrankYang0529