kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-15561: Client support for new SubscriptionPattern based subscription

Open Phuc-Hong-Tran opened this issue 2 years ago • 35 comments

Change:

  1. Implement methods in AsyncKafkaConsumer that accept SubscriptionPattern to subscribe to topic(s).
  2. Pass on the subscriptionPattern to SubscriptionState to use once server support RE2/J.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

Phuc-Hong-Tran avatar Jan 13 '24 11:01 Phuc-Hong-Tran

@lianetm, PTAL, thanks in advance.

Phuc-Hong-Tran avatar Jan 13 '24 11:01 Phuc-Hong-Tran

@lianetm, thanks for the comments, I will make sure to address those points in my next PR.

Regarding your point about passing the regex for HeartbeatRequestManager, I origninally included that in my code change, then I came across this PR https://github.com/apache/kafka/pull/14956 and decided that we need to wait for the broker to implement new regex logic first.

Phuc-Hong-Tran avatar Jan 16 '24 20:01 Phuc-Hong-Tran

This is the task to closely follow https://issues.apache.org/jira/browse/KAFKA-14517, where the broker will support the new regex.

lianetm avatar Jan 16 '24 21:01 lianetm

Thanks @lianetm

Phuc-Hong-Tran avatar Jan 16 '24 22:01 Phuc-Hong-Tran

  • This feature is not yet implemented on the broker, so there are a few areas that may need alignment. Would maybe make sense to wait for some progress in the broker and then give this another push.
  • This PR attempts to ensure that the new regex is included in the subscription state and that's definitely needed. But then we're missing the other half of the story: we should make sure that the regex is passed-on to the broker on the next heartbeat request (see HeartbeatRequestManager here)

I do not understand why we want to wait. We should make progress as much as possible even if the broker code is not there yet. In the worst case, we would get an error back from the broker until the broker-side is implemented. But that would be fine in my opinion. @lianetm WDYT?

cadonna avatar Jan 31 '24 10:01 cadonna

@cadonna thanks for the comment, I can still finish one as the deadline required, there is no need to wait for the logic on broker to be finnished, though isn't it this one aiming for 3.8 release?

Phuc-Hong-Tran avatar Jan 31 '24 11:01 Phuc-Hong-Tran

@Phuc-Hong-Tran

though isn't it this one aiming for 3.8 release?

Yes, but we have time-based releases in Apache Kafka. That means that the deadline for the release will be somewhere in the beginning of April. We need to keep enough time before this data for testing.

cadonna avatar Jan 31 '24 11:01 cadonna

I understand, will get back to speed on this one

Phuc-Hong-Tran avatar Jan 31 '24 11:01 Phuc-Hong-Tran

In my opinion, we should not merge the client side only after the server side is implemented. The reason is that we need to change the RPC (this is actually missing in this PR) and this should be driven by the server side work.

dajac avatar Jan 31 '24 11:01 dajac

@dajac OK, but we can implement and unit test everything up to the RPC, right?

cadonna avatar Jan 31 '24 11:01 cadonna

@dajac, when we're talking about the RPC, do we mean the field for the regex in ConsumerGroupHeartbeatRequest.json?

Phuc-Hong-Tran avatar Jan 31 '24 11:01 Phuc-Hong-Tran

@cadonna Yes. We could for instance commit the RPC and then work independently on the client and the server. My only concern is that we usually discover issues while working on the server side. This is why I usually prefer to get the server code into a reasonable state first. In this case, the risk is low as we are talking about a single field.

dajac avatar Jan 31 '24 11:01 dajac

@Phuc-Hong-Tran Yep, that's right.

dajac avatar Jan 31 '24 11:01 dajac

@dajac If you feel more comfortable, we could implement and test everything up to the point where the field is populated. We would then not populate the field so that you do not need to add the field to the RPC before the broker-side is done.

cadonna avatar Jan 31 '24 11:01 cadonna

@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?

Phuc-Hong-Tran avatar Feb 06 '24 11:02 Phuc-Hong-Tran

@cadonna Just for clarification, when we were talking about "implement and test everything up to the point where the field is populated", does that mean we're not gonna implement and test the part where the client receive the assignment from broker at this stage?

Phuc-Hong-Tran avatar Feb 06 '24 15:02 Phuc-Hong-Tran

Hey @Phuc-Hong-Tran , regarding the mixed usage of subscribe with Pattern and with SubscriptionPattern, my opinion is that it is something we should live with to provide a smooth transition, while the usage of Pattern is deprecated. So I would say that we shouldn't restrict it by throwing any new exception to the user (which btw, would introduce API level changes not included in the KIP, so it would require an updated/new KIP). We could just allow subsequent calls to both subscribe with Pattern or SubscriptionPattern, and just ensure that the latest prevails. This is the behaviour for subsequent calls to subscribe(Pattern..), tested in testSubsequentPatternSubscription.

Just for the record, there is a restriction (see here) for not allowing mixed usage of subscribe, but only when mixing different subscription types (topics, partitions, pattern). We continue to respect that, without introducing any new restriction for the calls that in the end represent the same pattern-based subscription type (AUTO_PATTERN).

lianetm avatar Feb 09 '24 15:02 lianetm

@Phuc-Hong-Tran regarding this:

Just for clarification, when we were talking about "implement and test everything up to the point where the field is populated", does that mean we're not gonna implement and test the part where the client receive the assignment from broker at this stage?

We do need to:

  • implement and test everything up to the point where the field is populated (from the point the user calls subscribe with the SubscriptionPattern, to the point where the supplied regex is available in the HB builder, to be included in the HB request).
  • include the field in the HB request. This is where we do need the RPC to be updated to support the new field.

I could be missing something, but I would say we don't need any changes for the part where the client receives the assignment from the broker after subscribing to a regex. It should be exactly the same logic as when a client receives an assignment from the broker after subscribing to a list of topics. After sending the HB with the new regex, the client will receive the list of partitions assigned to it, and will reconcile them, just as it reconciles all assignments received (no matter the subscription type that led to receiving that assignment).

Just for the record, the legacy coordinator does have a bit of logic (here) for validating assignments received after subscribing to a regex, where it checks that the assignment received matches the regex. Our initial thought was not to include any assignment validation like that in the client, in an attempt to simplify it: the broker is the sole responsible for computing the regex and target assignment, the client takes and reconciles whatever the broker sends, and if the subscription changes from the client side, we have a common logic (not specific for regex), to make sure that the new subscription is sent to the broker (what the legacy achieved with the rejoin)

lianetm avatar Feb 09 '24 15:02 lianetm

@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?

IMO, we must support the deprecated pattern subscriptions with java.util.regex.Pattern to ensure backwards compatibility, but we do not need to support mixed usage of java.util.regex.Pattern and Google regex patterns. I think this is a blind spot in the KIP. I propose to throw an IllegalStateException if subscribe(java.util.regex.Pattern) is called after subscribe(SubscriptionPattern) (and vice versa) without calling unsubscribe() in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?

cadonna avatar Feb 13 '24 11:02 cadonna

I could be missing something, but I would say we don't need any changes for the part where the client receives the assignment from the broker after subscribing to a regex. It should be exactly the same logic as when a client receives an assignment from the broker after subscribing to a list of topics. After sending the HB with the new regex, the client will receive the list of partitions assigned to it, and will reconcile them, just as it reconciles all assignments received (no matter the subscription type that led to receiving that assignment).

That is also my understanding.

cadonna avatar Feb 13 '24 11:02 cadonna

@Phuc-Hong-Tran Could you please implement the changes that I requested so that we can move on?

cadonna avatar Feb 13 '24 11:02 cadonna

@lianetm thanks for the reply. I was more wondering about the testing strategy of the new subscribe(SubscriptionPattern) method when it comes to receiving the assignment, since that part is not finished by the broker, but I'll follow the same testing logic as the subscribe(Pattern) method as you mentioned the receiving part work the same for both

Phuc-Hong-Tran avatar Feb 13 '24 11:02 Phuc-Hong-Tran

@cadonna, sorry for the delay. I'll push the changes tomorrow

Phuc-Hong-Tran avatar Feb 13 '24 11:02 Phuc-Hong-Tran

@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?

IMO, we must support the deprecated pattern subscriptions with java.util.regex.Pattern to ensure backwards compatibility, but we do not need to support mixed usage of java.util.regex.Pattern and Google regex patterns. I think this is a blind spot in the KIP. I propose to throw an IllegalStateException if subscribe(java.util.regex.Pattern) is called after subscribe(SubscriptionPattern) (and vice versa) without calling unsubscribe() in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types.

Does this make sense to you?

\cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?

@cadonna that makes sense to me

Phuc-Hong-Tran avatar Feb 13 '24 11:02 Phuc-Hong-Tran

@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?

IMO, we must support the deprecated pattern subscriptions with java.util.regex.Pattern to ensure backwards compatibility, but we do not need to support mixed usage of java.util.regex.Pattern and Google regex patterns. I think this is a blind spot in the KIP. I propose to throw an IllegalStateException if subscribe(java.util.regex.Pattern) is called after subscribe(SubscriptionPattern) (and vice versa) without calling unsubscribe() in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?

@cadonna I would rather follow what we already do with subscribe today. The last one called takes precedence.

dajac avatar Feb 16 '24 12:02 dajac

I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this?

dajac avatar Feb 16 '24 13:02 dajac

@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?

IMO, we must support the deprecated pattern subscriptions with java.util.regex.Pattern to ensure backwards compatibility, but we do not need to support mixed usage of java.util.regex.Pattern and Google regex patterns. I think this is a blind spot in the KIP. I propose to throw an IllegalStateException if subscribe(java.util.regex.Pattern) is called after subscribe(SubscriptionPattern) (and vice versa) without calling unsubscribe() in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?

@cadonna I would rather follow what we already do with subscribe today. The last one called takes precedence.

I have a question. The subscribe method that use Pattern override the subscription with topic(s) that match the Pattern. When user choose to use SubscriptionPattern, but already used Pattern beforehand, should we clear out the old subscription?

Phuc-Hong-Tran avatar Feb 19 '24 13:02 Phuc-Hong-Tran

@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?

IMO, we must support the deprecated pattern subscriptions with java.util.regex.Pattern to ensure backwards compatibility, but we do not need to support mixed usage of java.util.regex.Pattern and Google regex patterns. I think this is a blind spot in the KIP. I propose to throw an IllegalStateException if subscribe(java.util.regex.Pattern) is called after subscribe(SubscriptionPattern) (and vice versa) without calling unsubscribe() in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?

@cadonna I would rather follow what we already do with subscribe today. The last one called takes precedence.

@dajac The javadocs of subscribe() say:

     * @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
     *                               previously (without a subsequent call to {@link #unsubscribe()}), or if not
     *                               configured at-least one partition assignment strategy

https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L727

One could argue that both subscriptions are pattern subscriptions, but they are quite different internally. I am wondering how complex it is to allow mixed usage. If it is not that complex, I agree with your proposal, otherwise I am doubting whether it is really worth it.

In any case the KIP is not clear about the expected behavior.

cadonna avatar Feb 20 '24 09:02 cadonna

I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this?

@dajac Is it also possible to verify the validity of the regular expression client-side? If we assume that the clients send valid regular expressions to the brokers, I think it would be OK to return an invalid request exception and log the error broker-side. Sending invalid regular expressions should than just be a mistake that happens during development of the clients and not something that happens during usage of the clients.

The benefit would be to find the mistakes in regular expressions without a request to the brokers.

The downside of it is that we need some way to validate the regular expressions client-side like the corresponding Google library in Java and I do not know what dependency are needed for clients in other languages.

cadonna avatar Feb 20 '24 10:02 cadonna

I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this?

@dajac Is it also possible to verify the validity of the regular expression client-side?

If we assume that the clients send valid regular expressions to the brokers, I think it would be OK to return an invalid request exception and log the error broker-side. Sending invalid regular expressions should than just be a mistake that happens during development of the clients and not something that happens during usage of the clients.

The benefit would be to find the mistakes in regular expressions without a request to the brokers.

The downside of it is that we need some way to validate the regular expressions client-side like the corresponding Google library in Java and I do not know what dependency are needed for clients in other languages.

I don't think we can include the Google library in the client code. I saw the comment about it on the pull request for the implementation of the regex logic on the broker. Will find it again and quote it here.

Edit: here is the comment https://github.com/apache/kafka/pull/14327#pullrequestreview-1622366023

Phuc-Hong-Tran avatar Feb 20 '24 10:02 Phuc-Hong-Tran