spring-kafka
spring-kafka copied to clipboard
Add test for next generation consumer-group rebalance protocol
Starting with 3.7.0
, the Kafka client now supports a new consumer group rebalance protocol and categorizes the consumer-group strategy as legacy (classic) vs. consumer. Spring for Apache Kafka supports the new protocol transparently; however, we need to verify this by adding an integration test. See more details here.
I am investigating whether I can resolve this issue on my own. However, it seems currently unachievable due to the limitations of KafkaTestKit
, which is used to create EmbeddedKafka
.
I have created an issue with the apache/Kafka
Team to see if there are any other workarounds.
I will let you know if there are any updates. π
@chickenchickenlove ,
Thank you for looking into this!
Would you mind to share more info what the problem have you faced?
What issue have you created on Apache Kafka side?
Did you try to test it against @EmbeddedKafka(kraft = false)
which does not use KafkaClusterTestKit
?
It is a bit odd to hear that broker behavior has some how an effect on the consumer:
GroupProtocol groupProtocol = GroupProtocol.valueOf(config.getString(ConsumerConfig.GROUP_PROTOCOL_CONFIG).toUpperCase(Locale.ROOT));
if (groupProtocol == GroupProtocol.CONSUMER)
return new AsyncKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
else
return new LegacyKafkaConsumer<>(config, keyDeserializer, valueDeserializer);
@chickenchickenlove This is an issue we need to tackle before the GA release, so we were planning to take a look at this, glad that you started on it. Please let us know if disabling kraft
as @artembilan suggested works.
@sobychacko, @artembilan thanks for you guys comments.
I didn't know about @EmbeddedKafka(kraft = false)
. i'm going to try to test it ASAP. π
This is issue which i reported. I tested KIP-848 rebalancing with this code. (docker-compose and simple main class). and it did work well.
However, i have problem when i used KafkaTestKit with @EmbeddedKafka
.
I think here is the thing.
NOTE: We cannot set controller.quorum.voters
the way we want. (always, 0.0.0.0:0
)
I may not be very knowledgeable about KRAFT
, so this might not be accurate.
It seems that in that state, there is no quorum available for voting.
The AsyncKafkaConsumer
sends a HeartBeat
to the Broker
using the consumer group protocol
, but the Broker
cannot assign Topic Partitions
because there are no quorum voters
available to assign.
Do you have any idea how controller.quorum.voters=0.0.0.0:0
works with?
Thanks for sharing!
So, if we cannot change controller.quorum.voters
for that KafkaTestKit
, then it sounds like we cannot test this new consumer mode for KRaft environment.
Back in days we also have faced the problem that KafkaTestKit
is not able to accept fixed ports for brokers it creates.
Therefore we recommend to use ZK mode kraft = false
if you deal with fixed ports.
Might be the case for this new consumer as well: just don't use KRaft!
I wonder if we need to think about changing a default kraft
from true
to false
since it is not able to cover all the use-cases we have around.
At least for time being until Apache Kafka will have those inconveniences fixed...
I think staying with kraft = false
for this test will help us verify the behavior of this new consumer group faster without worrying about kraft
(for the time being, given the issues we ran into).
@artembilan, @sobychacko.
Before setting kraft=false
in @EmbeddedKafka
, I first tested whether KIP-848 works in ZK
Mode with docker-compose
.
IMHO, we can use new consumer rebalancing protocol in only KRaft
mode. ππππ
I have created two versions of the docker-compose
files.
After setting up the consumer configurations identically, the consumer sent requests to each broker(ZK
, KRaft
) and received different results.
In ZK
mode, consumer received unsupported API Messages from broker. (link)
On the Otherhand, in KRAFT
mode, consumer got metadata of assigned topic and partitions. (link)
Thus, i think we should you KRAFT
mode to test new rebalancing protocol.
If we should test it anyway, i have one workaround.
How about using testContainer
until testing with EmbeddedKafka
is possible? π€
- https://mvnrepository.com/artifact/org.testcontainers/kafka
- https://mvnrepository.com/artifact/org.testcontainers/testcontainers
I think it can be one of work around. But of course, we must endure the longer test times and the addition of new dependencies.
What do you think, you guys?
@chickenchickenlove Thanks for these findings. Is the fact that the new consumer protocol is not working with ZK a well-known issue? Is there a corresponding Kafka Jira issue that we can link from here? If this is the case, we should mention it in our docs until it is resolved; otherwise, we will see issues related to it coming to this project, thinking that the issue is in Spring for Apache Kafka.
Regarding your question about using test containers, we discussed this as an alternative internally yesterday but opted against bringing it into the project. We don't really need a test for this feature in the framework; we thought that it would be nice to have a test to verify it since this is the first time we are exposing this via Spring for Apache Kafka. The framework didn't have to make any code changes related to this when we upgraded to the 3.7.0
Java client. Maybe you can create a standalone application that uses the new consumer group protocol + testcontainers-based Kafka using KRaft
. If anyone wants to see an example of this feature, we can point to this until the test kit issues are resolved. What do you think?
Or we can opt-in for respective sample where we can use Testcontaiers to demonstrate the feature...
@artembilan That too.
@sobychacko Thanks for your comments πββοΈ
Thanks for these findings. Is the fact that the new consumer protocol is not working with ZK a well-known issue? Is there a corresponding Kafka Jira issue that we can link from here? If this is the case, we should mention it in our docs until it is resolved; otherwise, we will see issues related to it coming to this project, thinking that the issue is in Spring for Apache Kafka.
No, it is my assumption based on test result, not a fact yet.
Because i have not yet searched for any issues on kafka
jira.
My assumption is based on the test results of docker-compose
.
As you said, I will officially inquire with Apache Kafka
and share the results.
Regarding your question about using test containers, we discussed this as an alternative internally yesterday but opted against bringing it into the project. We don't really need a test for this feature in the framework; we thought that it would be nice to have a test to verify it since this is the first time we are exposing this via Spring for Apache Kafka. The framework didn't have to make any code changes related to this when we upgraded to the 3.7.0 Java client. Maybe you can create a standalone application that uses the new consumer group protocol + testcontainers-based Kafka using KRaft. If anyone wants to see an example of this feature, we can point to this until the test kit issues are resolved. What do you think?
I agree π.
There are currently a few test codes related to rebalancing in spring-kafka
. (link)
How about modifying these test codes to use TestContainer
with KRaft
, and storing in my public repository?
@artembilan
Thanks for your comment π
Sorry to say, i didn't understand your comments π
.
This is because i cannot find meaning of opt-in
.
When you have free time, could you give me more detailed context? Thanks in advance
Thanks in advance!
Sure!
The point is that this new consumer group functionality in Kafka does not effect anything in Spring.
Therefore having some test for this is a bit redundant for our framework since it is not going to cover any functionality in this project.
However, as @sobychacko pointed out, we are going to have more and more questions how to make Spring for Apache Kafka working with this new consumer group mode.
Turned out it is just respective environment that has to be configured: KRaft mode for broker and that ConsumerConfig.GROUP_PROTOCOL_CONFIG
property for consumer.
(Again: this is just an assumption according to your experience).
And therefore, since we don't cover any Spring functionality, it is logical just to demonstrate this as a sample.
We prefer to have our tests as stable as possible, so we try to rely on random ports and log dirs to not interfere with other tests.
But since GroupProtocol.CONSUMER
mode requires some controller.quorum.voters
changes and we cannot do that with KafkaTestKit
, so we cannot test it against our @EmbeddeKafka
.
So, the solution is Testcontainers, but I don't like to introduce a new dependency into the project which would not give too much benefits since we don't have the functionality in the project itself.
Therefore a sample for this kind of situation in our /samples
dir is a natural way to reach a middle ground.
Does it make sense now?
Also, this feature is not ready for prime time just because it is available via the 3.7.0
client. The release notes for the KIP clearly recommend to not to use it in production, and it is still evolving. As this becomes a more mature feature post 3.7.0
, we will also add corresponding support in the framework if necessary and test it accordingly. For now, let's go with the sample plan as Artem suggested. Thanks!
@artembilan Thanks for your kind explanation. I think I understand what you said completely. πββοΈ
@artembilan, @sobychacko May i create new PR for this issue? If so, when you have free time, please read and review the following. i want to sync my understanding and your requirements. π
- create new project
/samples/sample-06/
- create new test codes for showing to others in that project.
- It is okay to add
testContainer
dependency to project/samples/sample-06/
- is it okay to use
gradle
? (all of sample code usemaven
, but i'm more familiar withgradle
, so i prefer it).
Answer βyesβ for all your questions. Since it is a sample and it has to be an independent project everything is totally fine. We chose Maven because back in days it was a fact that Gradle users know Maven as well π
Hi, @sobychacko @artembilan !
I created a new issue on Apache JIRA to inquire whether the KIP-848
protocol officially supports Zookeeper
Mode as well. (link)
If there is any updates, i will let you guys know! or it's okay for you guys to subscribe it as well π
Answer βyesβ for all your questions. Since it is a sample and it has to be an independent project everything is totally fine. We chose Maven because back in days it was a fact that Gradle users know Maven as well π
@artembilan thanks your comments! I will create new PR!! thanks π
I got reply. you can check it in this issue (https://issues.apache.org/jira/browse/KAFKA-16657) In short,
- Apache Kafka will support zookeeper until 3.8.0
- However, new consumer rebalancing protocol will be ready on Apache Kafka 4.0.
- Thus, new consumer rebalancing protocol will not support ZK mode.