sarama icon indicating copy to clipboard operation
sarama copied to clipboard

feat(broker): negotiate MetadataRequest version

Open trapped opened this issue 7 months ago • 7 comments

The latest github.com/IBM/sarama version fails to connect to Redpanda brokers (latest version too):

  • Redpanda rejects the request with Unsupported version 10 for metadata API
  • sarama fails with kafka: client has run out of available brokers to talk to: EOF
Sarama logs ``` 2025/05/13 18:26:29 Initializing new client 2025/05/13 18:26:29 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:29 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:29 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:29 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:29 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:29 Closed connection to broker localhost:9092 2025/05/13 18:26:29 client/metadata no available broker to send metadata request to 2025/05/13 18:26:29 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:29 Error while sending ApiVersionsRequest to broker localhost:9092: kafka: broker not connected 2025/05/13 18:26:30 client/metadata retrying after 250ms... (2 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 client/metadata retrying after 250ms... (1 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 client/metadata retrying after 250ms... (0 attempts remaining) 2025/05/13 18:26:30 ClientID is the default of 'sarama', you should consider setting it to something application-specific. 2025/05/13 18:26:30 client/metadata fetching metadata for all topics from broker localhost:9092 2025/05/13 18:26:30 Connected to broker at localhost:9092 (unregistered) 2025/05/13 18:26:30 client/metadata got error from broker -1 while fetching metadata: EOF 2025/05/13 18:26:30 Closed connection to broker localhost:9092 2025/05/13 18:26:30 client/metadata no available broker to send metadata request to 2025/05/13 18:26:30 client/brokers resurrecting 1 dead seed brokers 2025/05/13 18:26:30 Closing Client ```

The culprit being simply that:

  1. Sarama chooses request versions solely according to the provided config.Version, even though it sends an ApiVersionsRequest
  2. MetadataRequest uses versions 8-9-10 for Kafka versions >= v2.4.0.0
  3. Redpanda only supports versions 0 to 8, and it correctly advertises it

This PR adds a brokerAPIVersions field to the Broker type, storing the ApiVersionsResponse values if conf.Version.IsAtLeast(V2_4_0_0) && conf.ApiVersionsRequest; request versions are then validated before sending requests.

Additionally, adds a NewNegotiatedMetadataRequest constructor that behaves like NewMetadataRequest, but considers the broker's advertised min/max API versions; this is then used in the client when refreshing metadata.

These changes allow connecting to Redpanda while maintaining config.Version = sarama.MaxVersion; they are not a complete implementation of version selection. Hopefully though this can be a starting point.

trapped avatar May 14 '25 08:05 trapped

@puellanivis thanks for the review! I'd love to run the CI on it to see if there's anything I've missed.

trapped avatar May 19 '25 06:05 trapped

Hey @dnwe! Of course, I'm happy to make wider changes 🙂

Yours seems like a really nice idea. It should be easy to then add the method to every other request, at which point we'd essentially have this "version negotiation" for everything I guess.

I'll come up with the changes ASAP!

trapped avatar May 19 '25 15:05 trapped

@trapped 😅 well it was just an initial idea, so don't discard what you have here, but if you have time and motivation then feel free to give it a go on a separate branch and see how it might look

The useful property is that it can all be done as "internal" api changes so we don't have to worry about anyone that might be using the protocol types externally, which gives us more freedom to experiment and redesign as we go along

dnwe avatar May 19 '25 15:05 dnwe

@dnwe yeah, makes sense 🙂 I'll share a link to another draft PR here soon.

trapped avatar May 19 '25 15:05 trapped

@dnwe for now I've opened a draft and copypasted your snipped practically verbatim to all request/response types: https://github.com/IBM/sarama/pull/3170

trapped avatar May 20 '25 07:05 trapped

@trapped thanks! One thing I forgot to take account of in my original snippet was that maxVersion supported by the remote broker can and will exceed the max version of the protocol that we've actually implemented the encoding/decoding for within Sarama. I'd also incorrectly used max where we probably wanted min semantics, as we probably want the old Version field from sarama.Config to pin the maximum versions that are used, and people could set Version to sarama.MaxVersion to opt-in to relying on api versions negotiated usage instead.

e.g., for FetchRequest we've only currently implemented up to V11 but Kafka 2.7 and newer will advertise V12+ – so we need to include that in the sum.

So for MetadataRequest probably something like this:

func (m *MetadataRequest) restrictApiVersion(minVersion, maxVersion int16) error {
	maxEncodedVersion := min(10, maxVersion) // see existing isValidVersion() bound check for max supported by Sarama
	if m.Version < minVersion {
		return fmt.Errorf("%w: unsupported API version %d for %T, supported versions are %d-%d",
			ErrUnsupportedVersion, m.Version, m, minVersion, maxEncodedVersion)
	}
	m.Version = min(m.Version, maxEncodedVersion)
	return nil
}

dnwe avatar May 20 '25 08:05 dnwe

Hey @dnwe, sorry for the delay 🙂 I've just pushed a commit to https://github.com/IBM/sarama/pull/3170 applying the change you requested to both Request and Response types - I'm not super familiar with the Sarama codebase/logic yet so thanks for clarifying what behavior it should implement!

trapped avatar May 27 '25 07:05 trapped