KAFKA-17064: New consumer assign should update assignment in background thread
With the new async consumer, the subscriptionState object is shared between the app thread and the background thread, but in principle all updates to the assignment should happen in the background thread, to avoid race conditions.
We should consider moving the assignment update to the background thread, as part of the AssignmentChangeEvent.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Hi @lianetm, I have two questions about this enhancmnet.
- Do we want to remove
NewTopicsMetadataUpdateRequestEvent? It's not used after this PR. - Do we want to remove
offsetsinAssignmentChangeEvent?ApplicationEventProcessor#processdoesn't use this data. It gets the data fromSubscriptionState#allConsumeddirectly inApplicationEventProcessor#maybeAutoCommitAsync.
Hey @FrankYang0529. Thanks for the patch! I haven't reviewed this PR yet, but from a quick look:
- yes, let's remove the unused event. Makes sense that we don't need it anymore since all happens in the background now.
- yes, seems sensible to remove the unused
offsetsand get the offsets to commit from the subscriptionState#allConsumed directly (that we have handy when processing the assignment change event)
I'll take a closer look at this PR asap in the next days. Thanks!
Hi @lianetm, thanks for the answer. I will covert the PR to ready for review again after I remove unused class and variable. Thank you.
Hey @FrankYang0529 , thanks for the changes! Left some high level comments for consideration, let me know what you think. Thanks!
Hi @lianetm, thanks for the suggestion! I implement CompletableApplicationEvent for AssignmentChangeEvent. We don't need to use waitForCondition to wait for SubscriptionState update now. Thanks.
Hey @FrankYang0529 , thanks for the updates! Left some comments.
Hi @lianetm, thanks for the review. In previous CI, there is a failed case PlaintextConsumerCallbackTest#testConsumerRebalanceListenerAssignOnPartitionsAssigned for consumer group protocol. To keep the behavior, I add some change to ConsumerUtils#getResult. Could you take a look when you have time? Thank you.
I noticed a failure on a relatively related test, testMissingOffsetNoResetPolicy, but after digging into it I noticed it has been flaky since it was enabled for the new consumer, before this PR, so I filed KAFKA-17395 for it.
Test failures unrelated, and 1 failure related to the new consumer (testMissingOffsetNoResetPolicy), but it's been flaky since enabled (graph here) and passes locally, so I'll merge (and file a jira for the flaky as soon as I sort out an issue with my jira account). Thanks @FrankYang0529 !
-- update Jira for the flaky test https://issues.apache.org/jira/browse/KAFKA-17395