kafka-go icon indicating copy to clipboard operation
kafka-go copied to clipboard

Supporting kafka static membership

Open huszkacs opened this issue 3 years ago • 3 comments

Static membership requires setting of group.instance.id, which requires JoinGroup API key version 5 and Heartbeat API Key version 3. Documentation

Validation of new member.id is changed in later releases. When a client tries to join to a consumer group for the first time with empty memberID, server will return with an error called MEMBER_ID_REQUIRED and a generated memberID for the client. That newly generated memberID needs to be passed to connection retry. Learn more here

Heartbeat log has been added as well, as a useful change for maintenance.

huszkacs avatar Jun 28 '22 13:06 huszkacs

All the failed test-cases are passed at my local env.

huszkacs avatar Jun 29 '22 11:06 huszkacs

It looks like the tests in CI pass for Kafka version 2.3.1 and later, this is because the API call version you used are likely not supported for older versions.

One of kafka-go's goals is to offer strong backward compatibility guarantees, so we will need to figure out a way to transparently fallback to older API versions when connecting to older versions (we usually do this with the ApiVersions API call).

achille-roussel avatar Jul 01 '22 05:07 achille-roussel

Hi @achille-roussel! Can you point out a similar example from the repo that I can learn from and implement it?

huszkacs avatar Jul 04 '22 07:07 huszkacs

The kafka.Conn type maintains a map of API versions supported by the server https://github.com/segmentio/kafka-go/blob/main/conn.go#L65-L66 which we use to determine which request and response version to use client-side.

That being said, the change might be simpler to implement once https://github.com/segmentio/kafka-go/pull/947 has been merged, otherwise we may have to do API version selection in many places instead of having it abstracted away by the kafka.Transport type.

achille-roussel avatar Aug 26 '22 16:08 achille-roussel

Right, I think I know what you mean.

I did small investigation, (sadly before I didn't got time to work with the PR). I started to think it would be nice to have a general type for each API protocol that can deliver all the versions data. But after I realized that general type is going to be almost always the latest version of the protocol. Therefor it just makes sense to not introducing a general type, just simply using interfaces.

There is version negotiation as you said and yes it is implemented at the conn level and only used there. I think it is impossible to avoid, since that gives us the information of how to parse the request/response to binary that kafka server accepts. There must be a switch pattern or a factory pattern that allows us to create the proper request version and read the corresponding response version. The caller of the coordinator functions will never know the negotiated version in advance, therefor the response must be go through some sort of abstraction which I think the interface just more appropriate.

I will push a new change as a suggestion and will see that if you like that or not.

huszkacs avatar Aug 26 '22 17:08 huszkacs