aiokafka icon indicating copy to clipboard operation
aiokafka copied to clipboard

[QUESTION] Support for Apache Kafka 4.0

Open ijuma opened this issue 10 months ago • 3 comments

Hi,

Apache Kafka 4.0 is removing a number of very old protocol api versions via KIP-896.

When I was looking at the code, I noticed that we seem to use V0 or V1 below:

https://github.com/ulrikjohansson/aiokafka/blob/master/aiokafka/client.py#L179

Am I reading that right? If so, this will not work with Apache Kafka 4.0 and would have to change to use at least Metadata request v4 (but ideally it would just use the newest version supported by the broker).

ijuma avatar Jan 14 '25 15:01 ijuma

I confirm that it's not working with AK 4.0 branch, the simple_produce.py example is giving this exception on the client:

Unable to request metadata from "localhost:42235": KafkaConnectionError: Connection at localhost:42235 closed
Traceback (most recent call last):
  File "/home/esabellico/workspace/aiokafka/examples/simple_produce.py", line 19, in <module>
    asyncio.run(send_one())
  File "/home/linuxbrew/.linuxbrew/opt/[email protected]/lib/python3.10/asyncio/runners.py", line 44, in run
    return loop.run_until_complete(main)
  File "/home/linuxbrew/.linuxbrew/opt/[email protected]/lib/python3.10/asyncio/base_events.py", line 649, in run_until_complete
    return future.result()
  File "/home/esabellico/workspace/aiokafka/examples/simple_produce.py", line 8, in send_one
    await producer.start()
  File "/home/esabellico/Standalone/venv3.10/lib/python3.10/site-packages/aiokafka/producer/producer.py", line 352, in start
    await self.client.bootstrap()
  File "/home/esabellico/Standalone/venv3.10/lib/python3.10/site-packages/aiokafka/client.py", line 266, in bootstrap
    raise KafkaConnectionError(f"Unable to bootstrap from {self.hosts}")
aiokafka.errors.KafkaConnectionError: KafkaConnectionError: Unable to bootstrap from [('localhost', 42235, <AddressFamily.AF_UNSPEC: 0>)]
Unclosed AIOKafkaProducer
producer: <aiokafka.producer.producer.AIOKafkaProducer object at 0x7ff528c80160>`

and it corresponds to this error log on the broker:

[2025-01-27 10:14:32,336] ERROR Closing socket for 127.0.0.1:42235-127.0.0.1:54344-0-1 because of error (kafka.network.Processor)
org.apache.kafka.common.errors.UnsupportedVersionException: Received request for api with key 3 (Metadata) and unsupported version 0
[2025-01-27 10:14:32,336] ERROR Exception while processing request from 127.0.0.1:42235-127.0.0.1:54344-0-1 (kafka.network.Processor)
org.apache.kafka.common.errors.UnsupportedVersionException: Received request for api with key 3 (Metadata) and unsupported version 0

emasab avatar Jan 27 '25 11:01 emasab

Hi @ods, Are there any plans regarding adding support for the 4.0 version. Any update would be great

nmashkoorStratuscent avatar May 02 '25 14:05 nmashkoorStratuscent

Are there any plans regarding adding support for the 4.0 version. Any update would be great

I’m not planning to implement it myself. But if someone volunteers to do it, I’ll do my best to support bringing it to release.

ods avatar May 03 '25 15:05 ods

Hello :) do you have any updates on this? I would like to upgrade to Kafka 4.1

mattiamatrix avatar Nov 11 '25 13:11 mattiamatrix

I can try to give it a shot, but I would probably go with multiple PRs.

Currently there are a lot of thing going on :

  • in different places, there is a concept of "api_version", that point to a broker version. I think it is a mistake, the API should be based on the available version for the different methods (regardless of the absolute version of the Kafka broker). It will simplify the code to just try the best version of each messages when needed
  • in admin client, the code is like this, but it is at "client" level, while Kafka recommend to do it at connection level https://kafka.apache.org/protocol#api_versions
  • in the connection class, there is actually already the code to fetch versions, but it is only use for SSL negotiation

My plan is then the following:

  • do a PR to rely on the fetched versions for each connection we use, maybe it will solve it already as there are already plenty of versions mapped in protocol
  • then, if it still doesn't work for Kafka 4, add the missing version in the protocol definition

For resolving the version at connection level, we need to abstract the request building via a builder class (a bit like in the java client https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/requests/FetchRequest.java ) instead having it a bit everywhere, in different ways, like here https://github.com/aio-libs/aiokafka/blob/master/aiokafka/consumer/fetcher.py#L438 or here https://github.com/aio-libs/aiokafka/blob/master/aiokafka/producer/sender.py#L764

If sounds good to you @ods I can give it a shot

vmaurin avatar Nov 17 '25 17:11 vmaurin

I have started a draft here for the "concept" https://github.com/aio-libs/aiokafka/pull/1136/files but it is far from being finished

  • the logic of handling version per connection is already there + the new Request class
  • I only addressed a part of admin protocol + metadata
  • the rest is to come later (I will try to work on it this week)

vmaurin avatar Nov 18 '25 17:11 vmaurin

I believe it reached a point where it might be usable if someone want to give it a try.

It is a quite large PR #1136 , and @ods doesn't seem very active, so any help is appreciated for code review, live testing and so on

Few things to highlight brought by the PR:

  • it might be compatible with kafka 4.0 without additional changes (but maybe the current version is as the support for oldest metadata was restored https://github.com/apache/kafka/commit/e6d72c9e60f4740b72ea21a607e234081252c428 by @ijuma ). We can address missing point in future PR
  • it might fix a few things (like isolation level was not passed during offset fetch request, so #911 is probably fixed now)
  • we are probably more robust with other Kafka protocol compatible servers (no more assumptions are made on a specific apache kafka version, we are only using advertised api versions by the server itself)
  • we can properly talk to a cluster where brokers might not share the same API versions
  • dropping support for old kafka (<0.11) will help remove a lot of code, maybe it is something to consider (I had to keep a configuration option a bit more explicit than "api_version" for that)

(adding the above to the PR itself)

vmaurin avatar Nov 20 '25 09:11 vmaurin