kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-17696 New consumer background operations unaware of metadata errors

Open m1a2st opened this issue 1 year ago • 5 comments

Jira: https://issues.apache.org/jira/browse/KAFKA-17696

When API calls that handle background events (e.g., poll, unsubscribe, close) encounter errors, the errors are only passed to the application thread via ErrorEvent. Other API calls that do not process background events (e.g., position) are not notified of these errors, meaning that issues like unauthorized access to topics will go unnoticed by those operations. Background operations are not aborted or notified when a metadata error occurs, such as an Unauthorized error, which can lead to situations where a call like position keeps waiting for an update, despite the Unauthorized error already happening.

Due to the blocking issue in applicationEventHandler.addAndGet(checkAndUpdatePositionsEvent);, I consider that we should use processBackgroundEvents to get the events, that is better than addAndGet.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

m1a2st avatar Oct 10 '24 06:10 m1a2st

Sorry for late to reply you,

but I'm not sure if it would work or if I even like it— Add a new instance variable to store authorization exceptions (e.g. UnauthorizedTopicException) and then update processBackgroundEvents()’ catch block to check for authorization errors and store then in that variable. Then add a maybeThrowAuthorizationException() that conditionally throws the error if it's non-null. We'd have to clear out the exception on subscribe() or assign(), but it might work.

Thanks @kirktrue suggestions, I find a good way to resolve the close problem, the TopicAuthorizationException[1] is similar with InvalidTopicException, thus I think I can add an or check in this if else condition [2], it is more clear and simplify [1] https://github.com/apache/kafka/blob/604564cce3e195eece6927dc83d150e0150022fc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1232 [2] https://github.com/apache/kafka/blob/604564cce3e195eece6927dc83d150e0150022fc/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1283

Please take a look at @lianetm's comments on KAFKA-17696 again as I think she has some suggestions worth pursuing.

I will take more deep think in these comments

m1a2st avatar Oct 16 '24 05:10 m1a2st

Also, this PR overlaps a lot with PR #17516, right?

kirktrue avatar Oct 17 '24 17:10 kirktrue

I believe we need to have some filtering in the background event processing logic, because we don't want the checks to inadvertently execute the ConsumerRebalanceListenerCallbackNeededEvent if a rebalance was initiated in the background thread.

kirktrue avatar Oct 17 '24 17:10 kirktrue

Hello @FrankYang0529 feel free to reopen PR, this PR can focus processBackgroundEvents and applicationEventHandler.addAndGet. Due to I can't pass the test, so I fix close method in this PR.

m1a2st avatar Oct 18 '24 12:10 m1a2st

Do we need to perform this same check in more places than just the handful in this PR?

If we want process all background event from the backgroundEventQueue, I think when we call applicationEventHandler.addAndGet we always need to call processBackgroundEvents first, check the backgroundEventQueue doesn't have any error event.

m1a2st avatar Oct 18 '24 16:10 m1a2st

@m1a2st—I tested this fix by merging the changes in this PR with the changes from my PR that sets the default group.protocol value to CONSUMER. Unfortunately, the integration tests that exercise authorization policies fail even with this fix 😢

If you want to test that your change works across our topic and consumer group authorization integration tests, do the following:

  1. Change the default for group.protocol in ConsumerConfig
  2. Run the integration tests which subclass EndToEndAuthorizationTest

kirktrue avatar Oct 30 '24 22:10 kirktrue

@kirktrue , Thanks for your reminder, I will take a look at these fail tests

m1a2st avatar Oct 31 '24 12:10 m1a2st

Hello @lianetm,

Thank you for your comments. I have already addressed the issue where CoordinatorRequestManager catches metadata errors and passes them to the ListOffsets event. However, I encountered some test failures in my local environment:

  • AuthorizerIntegrationTest#testPatternSubscriptionWithNoTopicAccess
  • AuthorizerIntegrationTest#testCreatePermissionOnClusterToReadFromNonExistentTopic
  • AuthorizerIntegrationTest#testCreatePermissionOnTopicToReadFromNonExistentTopic
  • AuthorizerIntegrationTest#testListOffsetsWithTopicDescribe
  • EndToEndAuthorizationTest#testNoConsumeWithoutDescribeAclViaSubscribe

Some of these tests are failing due to issues with the consumer.poll method. I am currently investigating the root cause but don’t have a clear solution yet.

m1a2st avatar Nov 18 '24 15:11 m1a2st

Hello @lianetm, @kirktrue I will focus on NetworkClientDelegate#maybePropagateMetadataError in this PR, and revert CoordinatorRequestManager fatal error this PR, and mark fail tests which fail reason is CoordinatorRequestManager for getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034 Is it make sense?

m1a2st avatar Nov 20 '24 12:11 m1a2st

I will focus on NetworkClientDelegate#maybePropagateMetadataError in this PR, and revert CoordinatorRequestManager fatal error this PR, and mark fail tests which fail reason is CoordinatorRequestManager for getTestQuorumAndGroupProtocolParametersClassicGroupProtocolOnly_KAFKA_18034

Sounds great to me! So I expect we'll end up with this PR addressing how we propagate metadata errors within the background thread to fail requests that should be aware of the error (should unblock all auth tests expecting TopicAuth error in api calls). Another PR addressing how we propagate coordinator errors within the background to fail requests similarly (unblock tests expecting GroupAuthErrors in api calls)

lianetm avatar Nov 20 '24 15:11 lianetm

don't yet understand the need for passing the metadata error around in a Future. And I'm also still wondering if this could be handled at a lower layer so that we don't have to have bespoke code in the request managers to deal with it.

The rationale behind this design is that when ConsumerNetworkThread#processApplicationEvents executes checkAndUpdatePositionsEvent, the TopicAuthorizationException doesn’t surface immediately. Instead, it may require several iterations of runOnce for the error to become apparent. Without using the future-based approach, it would be impossible to propagate this error from the background thread to the OffsetsRequestManager.

m1a2st avatar Nov 20 '24 17:11 m1a2st

I’m thinking that some test fail for methods like consumer.poll, which involve processBackgroundEvent, if a TopicAuthorizationException occurs, these two error handling mechanisms might conflict, leading to behavior that deviates from expectations.

m1a2st avatar Nov 20 '24 17:11 m1a2st

Hey @m1a2st, sharing a thought in case it helps. First, the problem we have is that api calls like position/endOffsets trigger events that should fail with topic metadata errors but they don't, and are left hanging until they time out. So, with that in mind, it occurred to me that we do have all the events that are awaiting responses in hand when then ConsumerNetworkThread.runOnce happens, because we have them within the reaper, that keeps all the completableEvents so they can be expired eventually. Couldn't we take those events and let them know about the error when it happens? Then each event decides if it should fail on topic metadata error or not. I'm picturing something along these lines:

On ConsumerNetworkThread.runOnce:

        // 1. get metadata error that happens here
        networkClientDelegate.poll(pollWaitTimeMs, currentTimeMs);
        ...
        // 2. get all awaiting events after expiration applies (the reaper has them all, not just the ones generated on the current runOnce)
        List<CompletableApplicationEvent> awaitingEvents = reapExpiredApplicationEvents(currentTimeMs);

        // 3. notify awaiting events about the metadata error
        if (metadataError != null) {
            awaitingEvents.forEach(e -> e.onMetadataError(metadataError));
        }

Would that work? I see that the main advantages would be to avoid the complexity of metadata future errors passed around to specific manager calls, and also it would be a solution applied consistently to all events (each event type then deciding if it should fail or not on topic metadata errors). onMetadataError, events could no-op by default, and some should override to simply do future.completeExceptionally, ex. CheckAndUpdatePositionsEvent, CommitEvent (these two seem to be the ones leading to the failed tests in the Authorizer file, we can get into details later about what others should consider the error).

I could be missing something but sharing in case it helps! Let me know.

lianetm avatar Nov 20 '24 19:11 lianetm

I’m thinking that some test fail for methods like consumer.poll, which involve processBackgroundEvent, if a TopicAuthorizationException occurs, these two error handling mechanisms might conflict, leading to behavior that deviates from expectations.

Sorry I missed this comment before. Great point, the issue is that with this PR (no matter how we implement it) we end up failing api calls/events on metadata errors, but still also keeping the previous logic that generated an ErrorEvent for them.

https://github.com/apache/kafka/blob/e73edce10bf0badf46d729045b57dc982666bd5a/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java#L157

We were propagating metadata errors via ErrorEvent thinking that it was only meant to be consumed from poll (which was a wrong assumption). If, with this PR, we introduce a mechanism to propagate it via the api events, I wonder if we should consider removing the redundant ErrorEvent for this case? (without ErrorEvent, poll would still fail as expected, because the CheckAndUpdatePositions would fail with the auth error)

lianetm avatar Nov 21 '24 02:11 lianetm

Hello @lianetm, Sorry for the late response.

Would that work? I see that the main advantages would be to avoid the complexity of metadata future errors passed around to specific manager calls, and also it would be a solution applied consistently to all events (each event type then deciding if it should fail or not on topic metadata errors).

I think this approach is great significantly simplifies the system by eliminating the need to pass CompletedFuture around, which reduces complexity. Also, based on current testing, the failing tests are still just these few.

m1a2st avatar Nov 22 '24 13:11 m1a2st

Hey @m1a2st , just FYI, we just enabled some auth tests that were marked as blocked on this issue, but are really not blocked on this (https://github.com/apache/kafka/pull/17885). So just merge trunk latest changes, and we can have this PR addressing/enabling only what's really related to this fix. Thanks!

lianetm avatar Nov 22 '24 21:11 lianetm

Hello @lianetm, Thanks for your review.

We were propagating metadata errors via ErrorEvent thinking that it was only meant to be consumed from poll (which was a wrong assumption). If, with this PR, we introduce a mechanism to propagate it via the api events, I wonder if we should consider removing the redundant ErrorEvent for this case? (without ErrorEvent, poll would still fail as expected, because the CheckAndUpdatePositions would fail with the auth error)

Based on this issue, the most straightforward solution I can think of at the moment is to add a new attribute in the event to determine whether the method call requires the use of the completedFuture for transmission. I have already drafted a version for this approach. WDYT?

m1a2st avatar Nov 23 '24 08:11 m1a2st

just FYI, we just enabled some auth tests that were marked as blocked on this issue, but are really not blocked on this (https://github.com/apache/kafka/pull/17885).

Here are the four failing tests I’ve categorized: The first type is related to GroupAuthorizationException, which includes the following tests: • testCommitWithNoAccesstestCommitWithNoGroupAccesstestOffsetFetchWithNoGroupAccess

These three should be addressed in JIRA 18034.

However, the test testPatternSubscriptionWithNoTopicAccess seems to have a different root cause. I’ve noticed that this test does not trigger the ConsumerRebalanceListenerCallbackCompletedEvent at all. It might be worth opening a new JIRA ticket to track this issue. WDYT?

m1a2st avatar Nov 24 '24 02:11 m1a2st

Regarding:

However, the test testPatternSubscriptionWithNoTopicAccess seems to have a different root cause. I’ve noticed that this test does not trigger the ConsumerRebalanceListenerCallbackCompletedEvent at all. It might be worth opening a new JIRA ticket to track this issue. WDYT?

Agree that is unrelated to this PR. Looking into it, that test is actually checking a behviour that is different between the 2 protocols/consumer. With the new protocol a member must always provide subscribed topics (or regex) to join a group, so if subscribed to a java pattern where no topics are found (because of auth, or any reason), there won't be callbacks triggered. Seems to me that test should only run for the classic consumer (and maybe let's just add a comment about how the behaviour is different with the new protocol). Makes sense?

-- update Filed https://issues.apache.org/jira/browse/KAFKA-18095 to reconsider this validation and maybe change the new consumer. For now we can just let this test disabled here because it's the current behaviour.

lianetm avatar Nov 25 '24 20:11 lianetm

Hello @lianetm @chia7712 , I believe we still cannot abandon the transmission method of ErrorEvent. I’ve noticed that there are other ErrorEvents that are handled differently under various scenarios. Simply using an optional transmission method is not sufficient to handle so many different situations.

m1a2st avatar Nov 28 '24 13:11 m1a2st

Hi @m1a2st , agree we still need ErrorEvent (for HB and coordinator errors), but with this PR seems we can remove its usage for metadata errors. That would be a good thing I believe, because we would have a single consistent way of propagating metadata errors (via the triggering API event only, like most events do). Note that metadata errors are the only ones that ended up in this weird situation of having 2 ways to be propagated, so that's what I was hoping we could improve. Makes sense?

lianetm avatar Nov 28 '24 13:11 lianetm

Note that metadata errors are the only ones that ended up in this weird situation of having 2 ways to be propagated, so that's what I was hoping we could improve. Makes sense?

Yes, I understand this, however, I’m now thinking about the async consumer poll method. Originally, we used processBackgroundEvent to retrieve the Error event, but now we’re trying to use metadataError passing to restore its triggering point, which feels a bit difficult to find the same triggering point. due to it is processing in different thread.

m1a2st avatar Nov 28 '24 14:11 m1a2st

I’m now thinking about the async consumer poll method. Originally, we used processBackgroundEvent to retrieve the Error event, but now we’re trying to use metadataError passing to restore its triggering point, which feels a bit difficult to find the same triggering point. due to it is processing in different thread.

Sorry I don't quite get this. If we say that the CheckAndUpdatePositions events will fail with metadata errors, I expect the poll should fail on addAndGet of that event, triggered from here:

https://github.com/apache/kafka/blob/e1ba01d21440a1de011c116a0bb6933656200009/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1780

Isn't that the case?

lianetm avatar Nov 28 '24 16:11 lianetm

If we say that the CheckAndUpdatePositions events will fail with metadata errors, I expect the poll should fail on addAndGet of that event, triggered from here:

we remove the way to pass Error Event in 153, thus there are some other test fail, Im tracing these fail test, and I'm thinking about how we deal with other type of exception https://github.com/apache/kafka/blob/b8c8e0c713117ee3f0e8c6c54edd5d2babde5f19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java#L153

  • FAILED ❌ TransactionsTest > testFencingOnSend(String, String).quorum=kraft.groupProtocol=classic
  • FAILED ❌ ShareConsumerTest > testSubscribeOnInvalidTopicThrowsInvalidTopicException(String).persister=org.apache.kafka.server.share.persister.NoOpShareStatePersister
  • FAILED ❌ ShareConsumerTest > testSubscribeOnInvalidTopicThrowsInvalidTopicException(String).persister=org.apache.kafka.server.share.persister.DefaultStatePersister
  • FAILED ❌ GroupEndToEndAuthorizationTest > testNoConsumeWithoutDescribeAclViaSubscribe(String, String).quorum=kraft.groupProtocol=consumer
  • FAILED ❌ PlaintextConsumerPollTest > testNoOffsetForPartitionExceptionOnPollZero(String, String).quorum=kraft.groupProtocol=consumer
  • FAILED ❌ KafkaConsumerTest > "testMissingOffsetNoResetPolicy(GroupProtocol).groupProtocol=CONSUMER"
  • FAILED ❌ KafkaConsumerTest > "testSubscriptionOnInvalidTopic(GroupProtocol).groupProtocol=CONSUMER"
  • FAILED ❌ KafkaConsumerTest > "testFetchStableOffsetThrowInPoll(GroupProtocol).groupProtocol=CONSUMER"

These tests are not throw the TopicAuthenticateException only, there are thrown NoOffsetForPartitionException, InvalidTopicException etc

m1a2st avatar Nov 28 '24 22:11 m1a2st

I think I have an idea of where the problem lies. With the current approach, the completeExceptionally inside updateFetchPositions cannot successfully propagate to the application thread.

m1a2st avatar Nov 29 '24 09:11 m1a2st

Hello @apoorvmittal10, @AndrewJSchofield, @lianetm, @kirktrue

The current PR has changed the way metadata errors are propagated, switching from backgroundEventHandler.add(new ErrorEvent(e)) to using Optional for transmission. I noticed that in SharedConsumer, the logic for handling errors on the application thread is slightly different. I think we need to sync up to understand this behavior adjustment better. https://github.com/apache/kafka/blob/b8c8e0c713117ee3f0e8c6c54edd5d2babde5f19/clients/src/main/java/org/apache/kafka/clients/consumer/internals/NetworkClientDelegate.java#L153

m1a2st avatar Nov 29 '24 13:11 m1a2st

Hey @m1a2st , thanks for the updates, very nice result unifying how metadata errors are propagated. I'll take another close look.

One suggestion already: we can simplifying the tweak we had to introduce on the unsubscribe to ignore metadata errors (with your PR, they won't pop out unexpectedly), so I would say we can remove InvalidTopic and TopicAuth from here (leaving GroupAuth only) https://github.com/apache/kafka/blob/7ca02fd90840535b22ad61517288a8dea6a39515/clients/src/main/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumer.java#L1584

and remove these tests that cover scenarios that is not valid anymore https://github.com/apache/kafka/blob/7ca02fd90840535b22ad61517288a8dea6a39515/clients/src/test/java/org/apache/kafka/clients/consumer/internals/AsyncKafkaConsumerTest.java#L273-L304

Makes sense?

lianetm avatar Nov 29 '24 21:11 lianetm

One suggestion already: we can simplifying the tweak we had to introduce on the unsubscribe to ignore metadata errors (with your PR, they won't pop out unexpectedly), so I would say we can remove InvalidTopic and TopicAuth from here (leaving GroupAuth only)

I think we should focus on PlaintextConsumerSubscriptionTest#testSubscribeInvalidTopicCanUnsubscribe this test, In CONSUMER protocol, we will throw metadata error on unsubscribe, If we don't want this behaviour, we also need to use other way to swallow this metadata error, WDYT?

m1a2st avatar Nov 30 '24 06:11 m1a2st

Thanks for @kirktrue comments, addressed them

m1a2st avatar Dec 04 '24 12:12 m1a2st

Hello @lianetm I think I still need to confirm with you about the ShareConsumerImpl. The original design of ShareConsumerImpl relied solely on ErrorEvent to propagate errors, and SharedConsumer does not include CompletableEvent during polling. Therefore, with the current design, it is unable to propagate metadata errors. Update: I think we should need another way to propagate errors, WDYT? The solution I currently have in mind is to add a new event specifically for handling metadata errors in ShareConsumer, which would enable proper propagation.

m1a2st avatar Dec 04 '24 15:12 m1a2st