flink-connector-kafka icon indicating copy to clipboard operation
flink-connector-kafka copied to clipboard

[FLINK-35283] PoC for supporting unique Kafka producer client ids

Open francis-a opened this issue 9 months ago • 3 comments

This PR came out of debuging a warning we’re seeing in our Flink logs. We’re running Flink 1.18 and have an application that uses Kafka topics as a source and a sink. We’re running with several tasks. The warning we’re seeing in the logs is:

WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=kafka producer client id

I’ve spent a bit of time debugging, and it looks like the root cause of this warning is the Flink KafkaSink creating multiple KafkaWriters that, in turn, create multiple KafkaProducers with the same Kafka producer client.id. Since the value for client.id is used when registering the AppInfo MBean — when multiple KafkaProducers with the same client.id are registered we get the above InstanceAlreadyExistsException. Since we’re running with several tasks and we get a Kafka producer per task this duplicate registration exception makes sense to me.

This PR proposes a fix that would update the KafkaSink.builder by adding a setClientIdPrefix method, similar to what we have already on the KafkaSource.builder.

francis-a avatar May 02 '24 08:05 francis-a

Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)

boring-cyborg[bot] avatar May 02 '24 08:05 boring-cyborg[bot]

I think this is very necessary. We can start multiple parallelism in the same JVM

SoberChina avatar Sep 01 '24 03:09 SoberChina

QQ: do we need to sanatize/validate the prefix? I don't think we currently do it if you manually set the prefix, so it's probably not needed.

AHeise avatar Sep 18 '24 08:09 AHeise