kafka-go
kafka-go copied to clipboard
Supporting kafka static membership
Static membership requires setting of group.instance.id, which requires JoinGroup API key version 5 and Heartbeat API Key version 3. Documentation
Validation of new member.id is changed in later releases. When a client tries to join to a consumer group for the first time with empty memberID, server will return with an error called MEMBER_ID_REQUIRED and a generated memberID for the client. That newly generated memberID needs to be passed to connection retry. Learn more here
Heartbeat log has been added as well, as a useful change for maintenance.
All the failed test-cases are passed at my local env.
It looks like the tests in CI pass for Kafka version 2.3.1 and later, this is because the API call version you used are likely not supported for older versions.
One of kafka-go's goals is to offer strong backward compatibility guarantees, so we will need to figure out a way to transparently fallback to older API versions when connecting to older versions (we usually do this with the ApiVersions API call).
Hi @achille-roussel! Can you point out a similar example from the repo that I can learn from and implement it?
The kafka.Conn type maintains a map of API versions supported by the server https://github.com/segmentio/kafka-go/blob/main/conn.go#L65-L66 which we use to determine which request and response version to use client-side.
That being said, the change might be simpler to implement once https://github.com/segmentio/kafka-go/pull/947 has been merged, otherwise we may have to do API version selection in many places instead of having it abstracted away by the kafka.Transport type.
Right, I think I know what you mean.
I did small investigation, (sadly before I didn't got time to work with the PR). I started to think it would be nice to have a general type for each API protocol that can deliver all the versions data. But after I realized that general type is going to be almost always the latest version of the protocol. Therefor it just makes sense to not introducing a general type, just simply using interfaces.
There is version negotiation as you said and yes it is implemented at the conn level and only used there. I think it is impossible to avoid, since that gives us the information of how to parse the request/response to binary that kafka server accepts. There must be a switch pattern or a factory pattern that allows us to create the proper request version and read the corresponding response version. The caller of the coordinator functions will never know the negotiated version in advance, therefor the response must be go through some sort of abstraction which I think the interface just more appropriate.
I will push a new change as a suggestion and will see that if you like that or not.