flink-connector-kafka icon indicating copy to clipboard operation
flink-connector-kafka copied to clipboard

[FLINK-37583] Upgrade to Kafka 4.0.0 client.

Open tomncooper opened this issue 8 months ago • 10 comments

Kafka Client version 4.0.0 has been released. This has many performance improvements and security fixes. This version is backwards compatible with older Kafka versions, however as noted in the release notes:

Old protocol API versions have been removed. Users should ensure brokers are version 2.1 or higher before upgrading Java clients (including Connect and Kafka Streams which use the clients internally) to 4.0.

Therefore, moving to this version will mean that the connector cannot be used with Kafka clusters running Kafka version 2.0 or older. Hopefully, users have upgraded beyond this point by now but it will need to be stated clearly in the release notes for the version containing this commit.

This PR updates the Kafka client version and other associated libraries to match those in the Kafka 4.0 release,

tomncooper avatar Mar 28 '25 17:03 tomncooper

@tomncooper I suspect that this will fail, given that the Flink Kafka connector relies on reflection for the two-phase commit part.

MartijnVisser avatar Mar 31 '25 12:03 MartijnVisser

@MartijnVisser It passed all the tests in my local repo (using mvn verify)? Are there other tests I should run (perhaps in the main Flink repo) to verify?

tomncooper avatar Mar 31 '25 12:03 tomncooper

Are there other tests I should run (perhaps in the main Flink repo) to verify?

Yeah, mvn clean install -Dflink.version=2.0.0 -Dscala-2.12 -Prun-end-to-end-tests -DdistDir=/yourpathto/flink-2.0.0 -Dflink.convergence.phase=install -Dlog4j.configurationFile=tools/ci/log4j.properties is what CI also runs (and has failed with unfortunately)

MartijnVisser avatar Mar 31 '25 13:03 MartijnVisser

So I have run:

mvn clean install -Prun-end-to-end-tests -Dflink.version=2.0.0 -Dscala-2.12 -DdistDir=$HOME/tools/flink/flink-2.0.0 -Dflink.convergence.phase=install -Dlog4j.configurationFile=tools/ci/log4j.properties

With both Java 17 and 21 locally and all tests are passing 🤔

tomncooper avatar Mar 31 '25 15:03 tomncooper

So I have run:

mvn clean install -Prun-end-to-end-tests -Dflink.version=2.0.0 -Dscala-2.12 -DdistDir=$HOME/tools/flink/flink-2.0.0 -Dflink.convergence.phase=install -Dlog4j.configurationFile=tools/ci/log4j.properties

With both Java 17 and 21 locally and all tests are passing 🤔

Hmmmm it could be a flaky test, but it's hard to pin down if it was always flaky, or just with this PR.

MartijnVisser avatar Mar 31 '25 15:03 MartijnVisser

Ok, that run passed the e2e tests but failed on the licence check.

tomncooper avatar Mar 31 '25 17:03 tomncooper

Hummm, so I assume this is the test that is failing in CI. But when I run the same locally:

mvn exec:java@check-license -N -Dexec.args="/tmp/flink-maven-build-output $(pwd) /tmp/flink-validation-deployment" -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties

It passes?

tomncooper avatar Apr 01 '25 10:04 tomncooper

I rebased after the merge of #138 and that restarted the CI.

It seems that KafkaWriterFaultToleranceITCase is flaky as it passes locally with java 17 and 21 and has passed CI previously on this PR .

tomncooper avatar Apr 01 '25 13:04 tomncooper

@tomncooper @MartijnVisser @AHeise I suggest we move to Kafka 4.0..0 client when we do Kafka connector v4 for Flink 2. WDYT? ( assuming we can sort out the tests)

I think there is a case to say we do not backport Kafka client v4.0.0 support to Kafka connector v3.3 or 3.4, in case there are old Kafka clusters that we would not want to break on the v3 stream.

davidradl avatar Apr 04 '25 13:04 davidradl

@tomncooper @MartijnVisser @AHeise I suggest we move to Kafka 4.0..0 client when we do Kafka connector v4 for Flink 2. WDYT? ( assuming we can sort out the tests)

I think there is a case to say we do not backport Kafka client v4.0.0 support to Kafka connector v3.3 or 3.4, in case there are old Kafka clusters that we would not want to break on the v3 stream.

@davidradl As per the discussion on the dev mailing list, we are going to move ahead with a Connector 4.0 release with Flink 2.0 and Kafka 3.9.0. We can then do point release updates.

I still think, given that this PR would drop support for older Kafka versions it should be part of a further major version bump (ie 5.0) but we can have that discussion when the time comes to merge this.

tomncooper avatar Apr 07 '25 10:04 tomncooper

In order for the CI to pass, this PR will need to be merged first.

tomncooper avatar Jul 03 '25 15:07 tomncooper

I am not convinced yet why this PR bumps the major version of the connector.

I still think, given that this PR would drop support for older Kafka versions it should be part of a further major version bump (ie 5.0) but we can have that discussion when the time comes to merge this.

Is this true? Afaict the proposed change only increases the source compatible version but the new client version is still able to talk to older kafka clusters since the Kafka clients are backwards compatible. Users should still be safe to upgrade to the new connector version without changing the rest of their infrastructure e.g. Flink cluster/Kafka cluster upgrade.

fapaul avatar Jul 22 '25 14:07 fapaul

@fapaul Usually the clients are backwards compatible. However, Kafka 4.0 dropped support for many APIs and the older log message formats. As per the release notes:

Old protocol API versions have been removed. Users should ensure brokers are version 2.1 or higher before upgrading Java clients (including Connect and Kafka Streams which use the clients internally) to 4.0.

The 4.0 compatibility matrix further lays out the server version compatibility and shows that ideally, users of the Kafka 4.0 clients should be running Kafka 3.0 or higher.

I am happy with the version being whatever the community decides. My personal opinion is that we should, by bumping the major version of the connector, telegraph the fact that the connector version containing this PR will drop support for broker on <=2.0 (and limit support for 2.1-2.8).

tomncooper avatar Jul 22 '25 14:07 tomncooper

Thanks for the thorough explanation. In summary, in addition to dropping support for 0.x versions (has been done in the past already), we also now drop support for everything, including 2.0.

I am okay with the major version bump then. Can you look at the last failing tests to get this PR merged?

Also I noted the docs are slightly out of date https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/datastream/kafka/#dependency still states

Modern Kafka clients are backwards compatible with broker versions 0.10.0 or later. For details on Kafka compatibility, please refer to the official Kafka documentation.

Can you also update these docs to reflect that with this change, we expect the Kafka broker to be at least version 2.1?

fapaul avatar Jul 23 '25 07:07 fapaul

@fapaul This PR already updated the docs to say Kafka 2.1.0 or later. Do you think it needs more in depth description? I am happy to add that, but I will need to create a JIRA for the Chinese language changes.

I will also try and figure out why the licence checker is unhappy. It would be extremely helpful if that tool actually told you what the issue was!

tomncooper avatar Jul 23 '25 09:07 tomncooper

@fapaul This PR already updated the docs to say Kafka 2.1.0 or later. Do you think it needs more in depth description? I am happy to add that, but I will need to create a JIRA for the Chinese language changes.

Sorry missed, all good with the docs.

The log output is definitely strange. Usually there are logs for every violation. I suspect this change changed the default logging behavior https://github.com/apache/flink-connector-kafka/commit/1b02ce8c16943565d0c10277c55aeb0d04620e45 by setting the Flink log level to WARN which also affects the LicenseChecker

fapaul avatar Jul 23 '25 10:07 fapaul

Ok I am stumped? Locally I run (with maven 3.8.6 and java 17):

mvn clean deploy -U -B --no-transfer-progress -Dflink.version=2.0.0 \
    -DaltDeploymentRepository=validation_repository::default::file:/tmp/flink-validation-deployment \
    -Dscala-2.12 \
    -Prun-end-to-end-tests -DdistDir="$HOME/tools/flink/flink-2.0.0" \
    -Dflink.convergence.phase=install -Pcheck-convergence \
    -Dhttp.keepAlive=false -Dmaven.wagon.http.pool=false -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 \
    -Dlog4j.configurationFile="file://$(pwd)/tools/ci/log4j.properties" \
   | tee /tmp/mvn_build_output.out

Then the licence checker:

mvn -U -B --no-transfer-progress -Dflink.version=2.0.0 \
    exec:java@check-license -N \
    -Dexec.args="/tmp/mvn_build_output.out $(pwd) /tmp/flink-validation-deployment" \
    -Dhttp.keepAlive=false \
    -Dmaven.wagon.http.pool=false \
    -Dmaven.wagon.httpconnectionManager.ttlSeconds=120 \
    -Dlog4j.configurationFile=file://$(pwd)/tools/ci/log4j.properties

Which as far as I can tell is what the CI is running and it passes?

tomncooper avatar Jul 23 '25 10:07 tomncooper

Ok, I have no idea why but rebasing on main has lead to green tests on my fork.

I added an override to the CI log4j config to make sure the licence checker is able to output its logs.

@fapaul Are you able to kick the CI on this PR?

tomncooper avatar Jul 23 '25 12:07 tomncooper

@fapaul I squashed and rebased.

However, before we merge this I think it would be good to do a Kafka connector 4.1 release with the 3.9.1 client. That way if there are any users running older kafka versions they could still use Flink 2.0 with a fully patched (3.9.0 has a critical CVE) kafka client.

I raised this on the dev mailing list a while ago, but didn't get any feedback. WDYT?

tomncooper avatar Jul 24 '25 11:07 tomncooper

@fapaul Now that the 4.0.1 release of the connector is out, are we ok to merge this PR?

tomncooper avatar Aug 27 '25 14:08 tomncooper

Sorry for late reply yes, I will merge the PR now.

fapaul avatar Aug 29 '25 15:08 fapaul