KAFKA-17696 New consumer background operations unaware of metadata errors
Jira: https://issues.apache.org/jira/browse/KAFKA-17696
When API calls that handle background events (e.g., poll, unsubscribe, close) encounter errors, the errors are only passed to the application thread via ErrorEvent. Other API calls that do not process background events (e.g., position) are not notified of these errors, meaning that issues like unauthorized access to topics will go unnoticed by those operations. Background operations are not aborted or notified when a metadata error occurs, such as an Unauthorized error, which can lead to situations where a call like position keeps waiting for an update, despite the Unauthorized error already happening.
Due to the blocking issue in applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);, I consider that we should use processBackgroundEvents to get the events, that is better than addAndGet.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Sorry for late to reply you,
but I'm not sure if it would work or if I even like it— Add a new instance variable to store authorization exceptions (e.g. UnauthorizedTopicException) and then update processBackgroundEvents()’ catch block to check for authorization errors and store then in that variable. Then add a maybeThrowAuthorizationException() that conditionally throws the error if it's non-null. We'd have to clear out the exception on subscribe() or assign(), but it might work.
Thanks @kirktrue suggestions, I find a good way to resolve the close problem, the TopicAuthorizationException[1] is similar with InvalidTopicException, thus I think I can add an or check in this if else condition [2], it is more clear and simplify
[1] https://github.com/apache/kafka/blob/604564cce3e195eece6927dc83d150e0150022fc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1232
[2]
https://github.com/apache/kafka/blob/604564cce3e195eece6927dc83d150e0150022fc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1283
Please take a look at @lianetm's comments on KAFKA-17696 again as I think she has some suggestions worth pursuing.
I will take more deep think in these comments
Also, this PR overlaps a lot with PR #17516, right?
I believe we need to have some filtering in the background event processing logic, because we don't want the checks to inadvertently execute the ConsumerRebalanceListenerCallbackNeededEvent if a rebalance was initiated in the background thread.
Hello @FrankYang0529 feel free to reopen PR, this PR can focus processBackgroundEvents and applicationEventHandler.addAndGet. Due to I can't pass the test, so I fix close method in this PR.
Do we need to perform this same check in more places than just the handful in this PR?
If we want process all background event from the backgroundEventQueue, I think when we call applicationEventHandler.addAndGet we always need to call processBackgroundEvents first, check the backgroundEventQueue doesn't have any error event.
@m1a2st—I tested this fix by merging the changes in this PR with the changes from my PR that sets the default group.protocol value to CONSUMER. Unfortunately, the integration tests that exercise authorization policies fail even with this fix 😢
If you want to test that your change works across our topic and consumer group authorization integration tests, do the following:
- Change the default for
group.protocolinConsumerConfig - Run the integration tests which subclass
EndToEndAuthorizationTest
@kirktrue , Thanks for your reminder, I will take a look at these fail tests
Hello @lianetm,
Thank you for your comments. I have already addressed the issue where CoordinatorRequestManager catches metadata errors and passes them to the ListOffsets event. However, I encountered some test failures in my local environment:
-
AuthorizerIntegrationTest#testPatternSubscriptionWithNoTopicAccess -
AuthorizerIntegrationTest#testCreatePermissionOnClusterToReadFromNonExistentTopic -
AuthorizerIntegrationTest#testCreatePermissionOnTopicToReadFromNonExistentTopic -
AuthorizerIntegrationTest#testListOffsetsWithTopicDescribe -
EndToEndAuthorizationTest#testNoConsumeWithoutDescribeAclViaSubscribe
Some of these tests are failing due to issues with the consumer.poll method. I am currently investigating the root cause but don’t have a clear solution yet.
Hello @lianetm, @kirktrue I will focus on NetworkClientDelegate#maybePropagateMetadataError in this PR, and revert CoordinatorRequestManager fatal error this PR, and mark fail tests which fail reason is CoordinatorRequestManager for getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034
Is it make sense?
I will focus on NetworkClientDelegate#maybePropagateMetadataError in this PR, and revert CoordinatorRequestManager fatal error this PR, and mark fail tests which fail reason is CoordinatorRequestManager for getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034
Sounds great to me! So I expect we'll end up with this PR addressing how we propagate metadata errors within the background thread to fail requests that should be aware of the error (should unblock all auth tests expecting TopicAuth error in api calls). Another PR addressing how we propagate coordinator errors within the background to fail requests similarly (unblock tests expecting GroupAuthErrors in api calls)
don't yet understand the need for passing the metadata error around in a Future. And I'm also still wondering if this could be handled at a lower layer so that we don't have to have bespoke code in the request managers to deal with it.
The rationale behind this design is that when ConsumerNetworkThread#processApplicationEvents executes checkAndUpdatePositionsEvent, the TopicAuthorizationException doesn’t surface immediately. Instead, it may require several iterations of runOnce for the error to become apparent. Without using the future-based approach, it would be impossible to propagate this error from the background thread to the OffsetsRequestManager.
I’m thinking that some test fail for methods like consumer.poll, which involve processBackgroundEvent, if a TopicAuthorizationException occurs, these two error handling mechanisms might conflict, leading to behavior that deviates from expectations.
Hey @m1a2st, sharing a thought in case it helps. First, the problem we have is that api calls like position/endOffsets trigger events that should fail with topic metadata errors but they don't, and are left hanging until they time out. So, with that in mind, it occurred to me that we do have all the events that are awaiting responses in hand when then ConsumerNetworkThread.runOnce happens, because we have them within the reaper, that keeps all the completableEvents so they can be expired eventually. Couldn't we take those events and let them know about the error when it happens? Then each event decides if it should fail on topic metadata error or not. I'm picturing something along these lines:
On ConsumerNetworkThread.runOnce:
// 1. get metadata error that happens here
networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
...
// 2. get all awaiting events after expiration applies (the reaper has them all, not just the ones generated on the current runOnce)
List<CompletableApplicationEvent> awaitingEvents = reapExpiredApplicationEvents(currentTimeMs);
// 3. notify awaiting events about the metadata error
if (metadataError != null) {
awaitingEvents.forEach(e -> e.onMetadataError(metadataError));
}
Would that work? I see that the main advantages would be to avoid the complexity of metadata future errors passed around to specific manager calls, and also it would be a solution applied consistently to all events (each event type then deciding if it should fail or not on topic metadata errors). onMetadataError, events could no-op by default, and some should override to simply do future.completeExceptionally, ex. CheckAndUpdatePositionsEvent, CommitEvent (these two seem to be the ones leading to the failed tests in the Authorizer file, we can get into details later about what others should consider the error).
I could be missing something but sharing in case it helps! Let me know.
I’m thinking that some test fail for methods like
consumer.poll, which involve processBackgroundEvent, if aTopicAuthorizationExceptionoccurs, these two error handling mechanisms might conflict, leading to behavior that deviates from expectations.
Sorry I missed this comment before. Great point, the issue is that with this PR (no matter how we implement it) we end up failing api calls/events on metadata errors, but still also keeping the previous logic that generated an ErrorEvent for them.
https://github.com/apache/kafka/blob/e73edce10bf0badf46d729045b57dc982666bd5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java#L157
We were propagating metadata errors via ErrorEvent thinking that it was only meant to be consumed from poll (which was a wrong assumption). If, with this PR, we introduce a mechanism to propagate it via the api events, I wonder if we should consider removing the redundant ErrorEvent for this case? (without ErrorEvent, poll would still fail as expected, because the CheckAndUpdatePositions would fail with the auth error)
Hello @lianetm, Sorry for the late response.
Would that work? I see that the main advantages would be to avoid the complexity of metadata future errors passed around to specific manager calls, and also it would be a solution applied consistently to all events (each event type then deciding if it should fail or not on topic metadata errors).
I think this approach is great significantly simplifies the system by eliminating the need to pass CompletedFuture around, which reduces complexity. Also, based on current testing, the failing tests are still just these few.
Hey @m1a2st , just FYI, we just enabled some auth tests that were marked as blocked on this issue, but are really not blocked on this (https://github.com/apache/kafka/pull/17885). So just merge trunk latest changes, and we can have this PR addressing/enabling only what's really related to this fix. Thanks!
Hello @lianetm, Thanks for your review.
We were propagating metadata errors via ErrorEvent thinking that it was only meant to be consumed from poll (which was a wrong assumption). If, with this PR, we introduce a mechanism to propagate it via the api events, I wonder if we should consider removing the redundant ErrorEvent for this case? (without ErrorEvent, poll would still fail as expected, because the CheckAndUpdatePositions would fail with the auth error)
Based on this issue, the most straightforward solution I can think of at the moment is to add a new attribute in the event to determine whether the method call requires the use of the completedFuture for transmission. I have already drafted a version for this approach. WDYT?
just FYI, we just enabled some auth tests that were marked as blocked on this issue, but are really not blocked on this (https://github.com/apache/kafka/pull/17885).
Here are the four failing tests I’ve categorized:
The first type is related to GroupAuthorizationException, which includes the following tests:
• testCommitWithNoAccess
• testCommitWithNoGroupAccess
• testOffsetFetchWithNoGroupAccess
These three should be addressed in JIRA 18034.
However, the test testPatternSubscriptionWithNoTopicAccess seems to have a different root cause. I’ve noticed that this test does not trigger the ConsumerRebalanceListenerCallbackCompletedEvent at all. It might be worth opening a new JIRA ticket to track this issue. WDYT?
Regarding:
However, the test
testPatternSubscriptionWithNoTopicAccessseems to have a different root cause. I’ve noticed that this test does not trigger the ConsumerRebalanceListenerCallbackCompletedEvent at all. It might be worth opening a new JIRA ticket to track this issue. WDYT?
Agree that is unrelated to this PR. Looking into it, that test is actually checking a behviour that is different between the 2 protocols/consumer. With the new protocol a member must always provide subscribed topics (or regex) to join a group, so if subscribed to a java pattern where no topics are found (because of auth, or any reason), there won't be callbacks triggered. Seems to me that test should only run for the classic consumer (and maybe let's just add a comment about how the behaviour is different with the new protocol). Makes sense?
-- update Filed https://issues.apache.org/jira/browse/KAFKA-18095 to reconsider this validation and maybe change the new consumer. For now we can just let this test disabled here because it's the current behaviour.
Hello @lianetm @chia7712 , I believe we still cannot abandon the transmission method of ErrorEvent. I’ve noticed that there are other ErrorEvents that are handled differently under various scenarios. Simply using an optional transmission method is not sufficient to handle so many different situations.
Hi @m1a2st , agree we still need ErrorEvent (for HB and coordinator errors), but with this PR seems we can remove its usage for metadata errors. That would be a good thing I believe, because we would have a single consistent way of propagating metadata errors (via the triggering API event only, like most events do). Note that metadata errors are the only ones that ended up in this weird situation of having 2 ways to be propagated, so that's what I was hoping we could improve. Makes sense?
Note that metadata errors are the only ones that ended up in this weird situation of having 2 ways to be propagated, so that's what I was hoping we could improve. Makes sense?
Yes, I understand this, however, I’m now thinking about the async consumer poll method. Originally, we used processBackgroundEvent to retrieve the Error event, but now we’re trying to use metadataError passing to restore its triggering point, which feels a bit difficult to find the same triggering point. due to it is processing in different thread.
I’m now thinking about the async consumer poll method. Originally, we used processBackgroundEvent to retrieve the Error event, but now we’re trying to use metadataError passing to restore its triggering point, which feels a bit difficult to find the same triggering point. due to it is processing in different thread.
Sorry I don't quite get this. If we say that the CheckAndUpdatePositions events will fail with metadata errors, I expect the poll should fail on addAndGet of that event, triggered from here:
https://github.com/apache/kafka/blob/e1ba01d21440a1de011c116a0bb6933656200009/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1780
Isn't that the case?
If we say that the CheckAndUpdatePositions events will fail with metadata errors, I expect the poll should fail on addAndGet of that event, triggered from here:
we remove the way to pass Error Event in 153, thus there are some other test fail, Im tracing these fail test, and I'm thinking about how we deal with other type of exception https://github.com/apache/kafka/blob/b8c8e0c713117ee3f0e8c6c54edd5d2babde5f19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java#L153
- FAILED ❌ TransactionsTest > testFencingOnSend(String, String).quorum=kraft.groupProtocol=classic
- FAILED ❌ ShareConsumerTest > testSubscribeOnInvalidTopicThrowsInvalidTopicException(String).persister=org.apache.kafka.server.share.persister.NoOpShareStatePersister
- FAILED ❌ ShareConsumerTest > testSubscribeOnInvalidTopicThrowsInvalidTopicException(String).persister=org.apache.kafka.server.share.persister.DefaultStatePersister
- FAILED ❌ GroupEndToEndAuthorizationTest > testNoConsumeWithoutDescribeAclViaSubscribe(String, String).quorum=kraft.groupProtocol=consumer
- FAILED ❌ PlaintextConsumerPollTest > testNoOffsetForPartitionExceptionOnPollZero(String, String).quorum=kraft.groupProtocol=consumer
- FAILED ❌ KafkaConsumerTest > "testMissingOffsetNoResetPolicy(GroupProtocol).groupProtocol=CONSUMER"
- FAILED ❌ KafkaConsumerTest > "testSubscriptionOnInvalidTopic(GroupProtocol).groupProtocol=CONSUMER"
- FAILED ❌ KafkaConsumerTest > "testFetchStableOffsetThrowInPoll(GroupProtocol).groupProtocol=CONSUMER"
These tests are not throw the TopicAuthenticateException only, there are thrown NoOffsetForPartitionException, InvalidTopicException etc
I think I have an idea of where the problem lies. With the current approach, the completeExceptionally inside updateFetchPositions cannot successfully propagate to the application thread.
Hello @apoorvmittal10, @AndrewJSchofield, @lianetm, @kirktrue
The current PR has changed the way metadata errors are propagated, switching from backgroundEventHandler.add(new ErrorEvent(e)) to using Optional for transmission. I noticed that in SharedConsumer, the logic for handling errors on the application thread is slightly different. I think we need to sync up to understand this behavior adjustment better.
https://github.com/apache/kafka/blob/b8c8e0c713117ee3f0e8c6c54edd5d2babde5f19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java#L153
Hey @m1a2st , thanks for the updates, very nice result unifying how metadata errors are propagated. I'll take another close look.
One suggestion already: we can simplifying the tweak we had to introduce on the unsubscribe to ignore metadata errors (with your PR, they won't pop out unexpectedly), so I would say we can remove InvalidTopic and TopicAuth from here (leaving GroupAuth only)
https://github.com/apache/kafka/blob/7ca02fd90840535b22ad61517288a8dea6a39515/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1584
and remove these tests that cover scenarios that is not valid anymore https://github.com/apache/kafka/blob/7ca02fd90840535b22ad61517288a8dea6a39515/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java#L273-L304
Makes sense?
One suggestion already: we can simplifying the tweak we had to introduce on the unsubscribe to ignore metadata errors (with your PR, they won't pop out unexpectedly), so I would say we can remove InvalidTopic and TopicAuth from here (leaving GroupAuth only)
I think we should focus on PlaintextConsumerSubscriptionTest#testSubscribeInvalidTopicCanUnsubscribe this test, In CONSUMER protocol, we will throw metadata error on unsubscribe, If we don't want this behaviour, we also need to use other way to swallow this metadata error, WDYT?
Thanks for @kirktrue comments, addressed them
Hello @lianetm
I think I still need to confirm with you about the ShareConsumerImpl. The original design of ShareConsumerImpl relied solely on ErrorEvent to propagate errors, and SharedConsumer does not include CompletableEvent during polling. Therefore, with the current design, it is unable to propagate metadata errors.
Update:
I think we should need another way to propagate errors, WDYT?
The solution I currently have in mind is to add a new event specifically for handling metadata errors in ShareConsumer, which would enable proper propagation.