KAFKA-15561: Client support for new SubscriptionPattern based subscription
Change:
- Implement methods in AsyncKafkaConsumer that accept SubscriptionPattern to subscribe to topic(s).
- Pass on the subscriptionPattern to SubscriptionState to use once server support RE2/J.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
@lianetm, PTAL, thanks in advance.
@lianetm, thanks for the comments, I will make sure to address those points in my next PR.
Regarding your point about passing the regex for HeartbeatRequestManager, I origninally included that in my code change, then I came across this PR https://github.com/apache/kafka/pull/14956 and decided that we need to wait for the broker to implement new regex logic first.
This is the task to closely follow https://issues.apache.org/jira/browse/KAFKA-14517, where the broker will support the new regex.
Thanks @lianetm
- This feature is not yet implemented on the broker, so there are a few areas that may need alignment. Would maybe make sense to wait for some progress in the broker and then give this another push.
- This PR attempts to ensure that the new regex is included in the subscription state and that's definitely needed. But then we're missing the other half of the story: we should make sure that the regex is passed-on to the broker on the next heartbeat request (see HeartbeatRequestManager here)
I do not understand why we want to wait. We should make progress as much as possible even if the broker code is not there yet. In the worst case, we would get an error back from the broker until the broker-side is implemented. But that would be fine in my opinion. @lianetm WDYT?
@cadonna thanks for the comment, I can still finish one as the deadline required, there is no need to wait for the logic on broker to be finnished, though isn't it this one aiming for 3.8 release?
@Phuc-Hong-Tran
though isn't it this one aiming for 3.8 release?
Yes, but we have time-based releases in Apache Kafka. That means that the deadline for the release will be somewhere in the beginning of April. We need to keep enough time before this data for testing.
I understand, will get back to speed on this one
In my opinion, we should not merge the client side only after the server side is implemented. The reason is that we need to change the RPC (this is actually missing in this PR) and this should be driven by the server side work.
@dajac OK, but we can implement and unit test everything up to the RPC, right?
@dajac, when we're talking about the RPC, do we mean the field for the regex in ConsumerGroupHeartbeatRequest.json?
@cadonna Yes. We could for instance commit the RPC and then work independently on the client and the server. My only concern is that we usually discover issues while working on the server side. This is why I usually prefer to get the server code into a reasonable state first. In this case, the risk is low as we are talking about a single field.
@Phuc-Hong-Tran Yep, that's right.
@dajac If you feel more comfortable, we could implement and test everything up to the point where the field is populated. We would then not populate the field so that you do not need to add the field to the RPC before the broker-side is done.
@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?
@cadonna Just for clarification, when we were talking about "implement and test everything up to the point where the field is populated", does that mean we're not gonna implement and test the part where the client receive the assignment from broker at this stage?
Hey @Phuc-Hong-Tran , regarding the mixed usage of subscribe with Pattern and with SubscriptionPattern, my opinion is that it is something we should live with to provide a smooth transition, while the usage of Pattern is deprecated. So I would say that we shouldn't restrict it by throwing any new exception to the user (which btw, would introduce API level changes not included in the KIP, so it would require an updated/new KIP). We could just allow subsequent calls to both subscribe with Pattern or SubscriptionPattern, and just ensure that the latest prevails. This is the behaviour for subsequent calls to subscribe(Pattern..), tested in testSubsequentPatternSubscription.
Just for the record, there is a restriction (see here) for not allowing mixed usage of subscribe, but only when mixing different subscription types (topics, partitions, pattern). We continue to respect that, without introducing any new restriction for the calls that in the end represent the same pattern-based subscription type (AUTO_PATTERN).
@Phuc-Hong-Tran regarding this:
Just for clarification, when we were talking about "implement and test everything up to the point where the field is populated", does that mean we're not gonna implement and test the part where the client receive the assignment from broker at this stage?
We do need to:
-
implement and test everything up to the point where the field is populated(from the point the user calls subscribe with the SubscriptionPattern, to the point where the supplied regex is available in the HB builder, to be included in the HB request). - include the field in the HB request. This is where we do need the RPC to be updated to support the new field.
I could be missing something, but I would say we don't need any changes for the part where the client receives the assignment from the broker after subscribing to a regex. It should be exactly the same logic as when a client receives an assignment from the broker after subscribing to a list of topics. After sending the HB with the new regex, the client will receive the list of partitions assigned to it, and will reconcile them, just as it reconciles all assignments received (no matter the subscription type that led to receiving that assignment).
Just for the record, the legacy coordinator does have a bit of logic (here) for validating assignments received after subscribing to a regex, where it checks that the assignment received matches the regex. Our initial thought was not to include any assignment validation like that in the client, in an attempt to simplify it: the broker is the sole responsible for computing the regex and target assignment, the client takes and reconciles whatever the broker sends, and if the subscription changes from the client side, we have a common logic (not specific for regex), to make sure that the new subscription is sent to the broker (what the legacy achieved with the rejoin)
@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?
IMO, we must support the deprecated pattern subscriptions with java.util.regex.Pattern to ensure backwards compatibility, but we do not need to support mixed usage of java.util.regex.Pattern and Google regex patterns. I think this is a blind spot in the KIP. I propose to throw an IllegalStateException if subscribe(java.util.regex.Pattern) is called after subscribe(SubscriptionPattern) (and vice versa) without calling unsubscribe() in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types.
Does this make sense to you?
\cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?
I could be missing something, but I would say we don't need any changes for the part where the client receives the assignment from the broker after subscribing to a regex. It should be exactly the same logic as when a client receives an assignment from the broker after subscribing to a list of topics. After sending the HB with the new regex, the client will receive the list of partitions assigned to it, and will reconcile them, just as it reconciles all assignments received (no matter the subscription type that led to receiving that assignment).
That is also my understanding.
@Phuc-Hong-Tran Could you please implement the changes that I requested so that we can move on?
@lianetm thanks for the reply. I was more wondering about the testing strategy of the new subscribe(SubscriptionPattern) method when it comes to receiving the assignment, since that part is not finished by the broker, but I'll follow the same testing logic as the subscribe(Pattern) method as you mentioned the receiving part work the same for both
@cadonna, sorry for the delay. I'll push the changes tomorrow
@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?
IMO, we must support the deprecated pattern subscriptions with
java.util.regex.Patternto ensure backwards compatibility, but we do not need to support mixed usage ofjava.util.regex.Patternand Google regex patterns. I think this is a blind spot in the KIP. I propose to throw anIllegalStateExceptionifsubscribe(java.util.regex.Pattern)is called aftersubscribe(SubscriptionPattern)(and vice versa) without callingunsubscribe()in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types.Does this make sense to you?
\cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?
@cadonna that makes sense to me
@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?
IMO, we must support the deprecated pattern subscriptions with
java.util.regex.Patternto ensure backwards compatibility, but we do not need to support mixed usage ofjava.util.regex.Patternand Google regex patterns. I think this is a blind spot in the KIP. I propose to throw anIllegalStateExceptionifsubscribe(java.util.regex.Pattern)is called aftersubscribe(SubscriptionPattern)(and vice versa) without callingunsubscribe()in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?
@cadonna I would rather follow what we already do with subscribe today. The last one called takes precedence.
I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this?
@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?
IMO, we must support the deprecated pattern subscriptions with
java.util.regex.Patternto ensure backwards compatibility, but we do not need to support mixed usage ofjava.util.regex.Patternand Google regex patterns. I think this is a blind spot in the KIP. I propose to throw anIllegalStateExceptionifsubscribe(java.util.regex.Pattern)is called aftersubscribe(SubscriptionPattern)(and vice versa) without callingunsubscribe()in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?@cadonna I would rather follow what we already do with
subscribetoday. The last one called takes precedence.
I have a question. The subscribe method that use Pattern override the subscription with topic(s) that match the Pattern. When user choose to use SubscriptionPattern, but already used Pattern beforehand, should we clear out the old subscription?
@cadonna @lianetm, since we're supporting for both subscribe method using java.util.regex.Pattern and SubscriptionPattern, I think we should throw a illegal heartbeat exeption when user try to use both method at the same time and inform the user to use once at a time, since the field SubscribedRegex is used for java.util.regex.Pattern as well as SubscriptionPattern. What do you guys think?
IMO, we must support the deprecated pattern subscriptions with
java.util.regex.Patternto ensure backwards compatibility, but we do not need to support mixed usage ofjava.util.regex.Patternand Google regex patterns. I think this is a blind spot in the KIP. I propose to throw anIllegalStateExceptionifsubscribe(java.util.regex.Pattern)is called aftersubscribe(SubscriptionPattern)(and vice versa) without callingunsubscribe()in between. That is similar to the restrictions between pattern, topic, and partition subscriptions @lianetm linked above. I do not think it is worth to consider the edge case of mixed usage of the two pattern types. Does this make sense to you? \cc @dajac What do you as the original author of the KIP think? Should we update the KIP to make this clear?@cadonna I would rather follow what we already do with
subscribetoday. The last one called takes precedence.
@dajac The javadocs of subscribe() say:
* @throws IllegalStateException If {@code subscribe()} is called previously with topics, or assign is called
* previously (without a subsequent call to {@link #unsubscribe()}), or if not
* configured at-least one partition assignment strategy
https://github.com/apache/kafka/blob/f0087ac6a8a7b1005e9588e42b3679146bd3eb13/clients/src/main/java/org/apache/kafka/clients/consumer/KafkaConsumer.java#L727
One could argue that both subscriptions are pattern subscriptions, but they are quite different internally. I am wondering how complex it is to allow mixed usage. If it is not that complex, I agree with your proposal, otherwise I am doubting whether it is really worth it.
In any case the KIP is not clear about the expected behavior.
I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this?
@dajac Is it also possible to verify the validity of the regular expression client-side? If we assume that the clients send valid regular expressions to the brokers, I think it would be OK to return an invalid request exception and log the error broker-side. Sending invalid regular expressions should than just be a mistake that happens during development of the clients and not something that happens during usage of the clients.
The benefit would be to find the mistakes in regular expressions without a request to the brokers.
The downside of it is that we need some way to validate the regular expressions client-side like the corresponding Google library in Java and I do not know what dependency are needed for clients in other languages.
I was wondering whether we should introduce a new error code to signal to the user that the regular expression is invalid. Otherwise, we would have to use the invalid request exception and it does not seem user friendly. @cadonna @lianetm What do you think about this?
@dajac Is it also possible to verify the validity of the regular expression client-side?
If we assume that the clients send valid regular expressions to the brokers, I think it would be OK to return an invalid request exception and log the error broker-side. Sending invalid regular expressions should than just be a mistake that happens during development of the clients and not something that happens during usage of the clients.
The benefit would be to find the mistakes in regular expressions without a request to the brokers.
The downside of it is that we need some way to validate the regular expressions client-side like the corresponding Google library in Java and I do not know what dependency are needed for clients in other languages.
I don't think we can include the Google library in the client code. I saw the comment about it on the pull request for the implementation of the regex logic on the broker. Will find it again and quote it here.
Edit: here is the comment https://github.com/apache/kafka/pull/14327#pullrequestreview-1622366023