camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

CamelKafkasslsinkSinkConnector not handling binary payloads well

Open olivierdeckers opened this issue 2 years ago • 1 comments

Hello! I am trying to use the CamelKafkasslsinkSinkConnector to move a binary payload from one kafka cluster to another. Messages are being produced on the target cluster, but they payload is not completely identical. I discovered this is because the connectors' kafka producer is using StringSerializer as value.serializer. It logs: value.serializer = class org.apache.kafka.common.serialization.StringSerializer (org.apache.kafka.clients.producer.ProducerConfig:376)

I configured the connector like this:

{
		"camel.idempotency.enabled": "false",
		"camel.kamelet.kafka-ssl-sink.bootstrapServers": "localhost:9094",
		"camel.kamelet.kafka-ssl-sink.sslKeyPassword": "...",
		"camel.kamelet.kafka-ssl-sink.sslKeystoreLocation": "...",
		"camel.kamelet.kafka-ssl-sink.sslKeystorePassword": "...",
		"camel.kamelet.kafka-ssl-sink.sslTruststoreLocation": "...",
		"camel.kamelet.kafka-ssl-sink.topic": "target-topic",
		"camel.map.headers": "true",
		"camel.map.properties": "true",
		"connector.class": "org.apache.camel.kafkaconnector.kafkasslsink.CamelKafkasslsinkSinkConnector",
		"errors.deadletterqueue.context.headers.enable": "false",
		"errors.log.enable": "true",
		"errors.log.include.messages": "false",
		"header.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
		"key.converter": "org.apache.kafka.connect.converters.ByteArrayConverter",
		"name": "camel-kafka-ssl-sink-connector-test",
		"topics": "source-topic",
		"value.converter": "org.apache.kafka.connect.converters.ByteArrayConverter"
}

I cannot find in the documentation how to configure the value.serializer config used for creating the kafka producer. I also tried adding a couple of things to the connector config, like: producer.override.value.serializer, producer.value.serializer, value.serializer, ... but none of them seem to have any effect.

Is it possible to configure this? Shouldn't the producer default to ByteArrayConverter to make sure the connector can handle both text and binary payloads?

olivierdeckers avatar Jun 21 '23 16:06 olivierdeckers

Hi @olivierdeckers, the producer.override setting are only applicable to the Source connectors, that produce data into the topics. In here you're using Sink connector, where converters are used to deserialize the ConsumerRecord into SinkRecord which can be later transformed before it's passed to the plugin.

If you need to configure the producer used internally by the plugin, you need to look at configuration properties for camel-kafka component. You can find list of component properties in here for example. The properties for the component can be specified with camel.component.$componentName prefix in your case camel.component.kafka. As I see by default the camel.component.kafka.valueSerializer is set to org.apache.kafka.common.serialization.StringSerializer which might explain the problem.

You may also consider using MirrorMaker2 connector, available by default in Kafka. Although it does little bit more than simply producing the data, as it also synchronizes Kafka topic partitions.

jakubmalek avatar Jul 31 '23 07:07 jakubmalek