kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-18185: remove internal.leave.group.on.close config

Open frankvicky opened this issue 8 months ago • 3 comments

JIRA: KAFKA-18185

This is a follow-up of #17614 The patch is to remove the internal.leave.group.on.close config.

frankvicky avatar Apr 07 '25 07:04 frankvicky

Hi @ableegoldman Please take a look when you have a free moment. I have some questions since I'm not familiar with the stream module. Please bear with me if the code looks like it was written while drunk.

frankvicky avatar Apr 10 '25 07:04 frankvicky

looks like KafkaStreamsCloseOptionsIntegrationTest is failing

ableegoldman avatar Apr 24 '25 23:04 ableegoldman

@frankvicky this isn't strictly related to this PR but there's a KIP (1153) out to fix up the API for the Streams version of CloseOptions, would be great if you could take a look and give a +1 once it moves to voting

ableegoldman avatar Apr 26 '25 01:04 ableegoldman

Hey @frankvicky @chia7712 - what is missing from finishing this work?

Also, I see that support for CloseOptions was implemented for KIP-848 and share groups, but it is missing from KIP-1071. Do we need a separate ticket to complete the work to also include KIP-1071? Should work analogous.

lucasbru avatar Aug 26 '25 11:08 lucasbru

what is missing from finishing this work?

I will take a look later :rofl:

Also, I see that support for CloseOptions was implemented for KIP-848 and share groups, but it is missing from KIP-1071. Do we need a separate ticket to complete the work to also include KIP-1071? Should work analogous.

I’m not sure it makes sense that the shared consumer does not leave the group when closing. The main motivation for introducing CloseOptions is to improve Kafka Streams.

By the way, it would be good to have a separatet KIP to implement CloseOptions for shared consumer if needs. @AndrewJSchofield @apoorvmittal10 FYI

chia7712 avatar Aug 26 '25 11:08 chia7712

I was just cycling back to the KIP. There is the following example:

// Old usage
consumer.close(Duration.ofSeconds(30));
 
// New usage
consumer.close(CloseOptions.timeout(Duration.ofSeconds(30))
    .withGroupMembershipOperation(GroupMembershipOperation.DEFAULT));
 
// New usage (shorter version)
consumer.close(CloseOptions.timeout(Duration.ofSeconds(30)));

It seems this example is not correct? The behavior of the existing close() and close(Duration) should not change, and they would never send a leave-group-request (for both dynamic, and static case) from my understanding. Thus, for dynamic member, the new usage would need to pass REMAIN_IN_GROUP to get the same behavior?

Thinking about this further, we could include a change of default behavior for close() and close(Duration) in the KIP, but only apply it in AK 5.0. We cannot apply it in a minor release... Thoughts?

For KIP-1071, I would expect that it does behave the same way as the old protocol. It would be an unexpected side effect if close() would start to send a leave group request when switching to "streams" protocol?

I agree the share-consumer, as it has its own client, would need a new KIP to add CloseOption, but that seems to be orthogonal, and maybe it does not even make sense as @chia7712 said, as there is no such thing as a static shared group AFAIK?

mjsax avatar Aug 26 '25 20:08 mjsax

@mjsax Not sure - the default for the consumer was to leave the group for dynamic members, so I think the example is correct? We just need to make sure that Kafka Streams will use REMAIN_IN_GROUP by default, which I suppose this PR is doing.

But I'm also confused. In KIP-1092, I see this:

         * {@code LEAVE_GROUP} means the consumer will leave the group.
         * {@code REMAIN_IN_GROUP} means the consumer will remain in the group.
         * {@code DEFAULT} applies the default behavior, which may depend on whether the consumer is static or dynamic.

And further down, in the test plan:

leaveGroup(DEFAULT)
For dynamic members: Verify that the consumer leaves the group upon close, triggering a rebalance.
For static members: Verify that the consumer remains in the group, and the group remains stable.

This seems clear. But in the new consumer code, a different behavior seems to be documented, see https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/clients/consumer/internals/ConsumerMembershipManager.java#L431-L436

 - Default operation: both static and dynamic consumers will send a leave heartbeat
 - Leave operation: both static and dynamic consumers will send a leave heartbeat
 - Remain in group: only static consumers will send a leave heartbeat, while dynamic members will not

This seems to be completely different from the KIP. I did not check the implementation in detail though.

lucasbru avatar Aug 27 '25 09:08 lucasbru

Oh sorry. I mixed up this work with the corresponding KS work... https://cwiki.apache.org/confluence/display/KAFKA/KIP-1153%3A+Refactor+Kafka+Streams+CloseOptions+to+Fluent+API+Style

mjsax avatar Aug 27 '25 20:08 mjsax

@lucasbru: I have reviewed #17614. I suppose the difference you mentioned is due to https://github.com/apache/kafka/pull/17614#discussion_r2029841125.

frankvicky avatar Aug 28 '25 04:08 frankvicky

@frankvicky: This makes sense, then. So remaining in group means sending the leave epoch in the static case. Then it is just the code comment that is misleading and could probably be updated to state what you said in the PR comment that you referenced.

Will implement these changes also for KIP-1071 then? While static membership isn't fully implemented yet for KIP-1071, we should still pull through those changes.

lucasbru avatar Aug 28 '25 07:08 lucasbru

High level comment: Is it wise to complete this PR before https://github.com/apache/kafka/pull/19955 ? -- It seems we should only remove the internal config, after we got the public API change for KS merged? Otherwise, even if not publicly user facing, we are loosing the ability to change KS's behavior on KafkaStreams#close() ? -- I know that we have some users, how actually don't like how KS behalves, and actually use this internal config to change the behavior. If we merge this PR before we complete KIP-1153, be would introduce a "regression" for these users.

mjsax avatar Aug 29 '25 22:08 mjsax

Just talked to @bbejeck about this, and he will prioritize KIP-1153 to make sure we get is merged.

mjsax avatar Aug 29 '25 22:08 mjsax

@frankvicky What I actually meant to ask in my previous comment: Are you planning to implement these changes also for KIP-1071? While static membership isn't fully implemented yet for KIP-1071, we should still pull through those changes.

lucasbru avatar Sep 01 '25 09:09 lucasbru

@lucasbru: Sorry that I missed responding to your question. IMO, we should implement these changes also for KIP-1071, but I'm not sure we can do it at this moment. I'm investigating the root cause of the failed test JoinWithIncompleteMetadataIntegrationTest#testShouldAutoShutdownOnJoinWithIncompleteMetadata, and it appears that the root cause is that the new protocol doesn't trigger rebalance when the topic is missing. Would you happen to have any insight?

frankvicky avatar Sep 01 '25 09:09 frankvicky

@frankvicky It looks like that integration test was broken by https://github.com/apache/kafka/pull/20284 . I can see that it passes on CI only because it is first run with the old protocol, and then run with the new protocol, and in the new protocol it shuts down only because it tries to reuse the group ID from the first run. At this time, the member from the first run is still in the group, so we get an error that the group is of the incorrect protocol type.

I created https://issues.apache.org/jira/browse/KAFKA-19660 for that.

This indeed shines a light on your (this) PR though: It seems to changes the default for Kafka Streams: it leaves the group when streams.close() is called, while this wasn't the case before.

lucasbru avatar Sep 01 '25 12:09 lucasbru

@frankvicky I'm going to make a pass regarding this comment from @lucasbru - ~I want to confirm if we are changing the default behavior, which is something I don't think we want to do~
NM - after looking at the code more, I realize this is exactly what we want by default.

bbejeck avatar Oct 03 '25 15:10 bbejeck

Merged #19400 into trunk

bbejeck avatar Oct 03 '25 17:10 bbejeck