flink-connector-kafka
flink-connector-kafka copied to clipboard
[FLINK-35283] PoC for supporting unique Kafka producer client ids
This PR came out of debuging a warning we’re seeing in our Flink logs. We’re running Flink 1.18 and have an application that uses Kafka topics as a source and a sink. We’re running with several tasks. The warning we’re seeing in the logs is:
WARN org.apache.kafka.common.utils.AppInfoParser - Error registering AppInfo mbean
javax.management.InstanceAlreadyExistsException: kafka.producer:type=app-info,id=kafka producer client id
I’ve spent a bit of time debugging, and it looks like the root cause of this warning is the Flink KafkaSink
creating multiple KafkaWriter
s that, in turn, create multiple KafkaProducer
s with the same Kafka producer client.id
. Since the value for client.id
is used when registering the AppInfo
MBean — when multiple KafkaProducer
s with the same client.id
are registered we get the above InstanceAlreadyExistsException
. Since we’re running with several tasks and we get a Kafka producer per task this duplicate registration exception makes sense to me.
This PR proposes a fix that would update the KafkaSink.builder
by adding a setClientIdPrefix
method, similar to what we have already on the KafkaSource.builder
.
Thanks for opening this pull request! Please check out our contributing guidelines. (https://flink.apache.org/contributing/how-to-contribute.html)
I think this is very necessary. We can start multiple parallelism in the same JVM
QQ: do we need to sanatize/validate the prefix? I don't think we currently do it if you manually set the prefix, so it's probably not needed.