camel-kafka-connector
camel-kafka-connector copied to clipboard
Problems with CamelDDBStreamSourceConnector 3.20.3
Hi, I am struggling to use CamelDDBStreamSourceConnector for DynamoDB change capture. Could you please advice me how to get over them?
- I have tried several common converters for Kafka Connect, but only one of them is 'usable'
io.confluent.connect.avro.AvroConverter❌org.apache.kafka.connect.json.JsonConverter❌org.apache.kafka.connect.converters.ByteArrayConverter❌org.apache.kafka.connect.storage.StringConverter✅
The value of message in case of StringConverter, however, is address of InputStreamCache:
Could you provide me a name of converter I can use with this value.converter?
- The topic.key is currently blank. Could you clone it from payload of DDB Stream, for example:
ID.
Again, thanks for the work!
Here is my source connector configuration:
{
"connector.class": "org.apache.camel.kafkaconnector.awsddbstreamssource.CamelAwsddbstreamssourceSourceConnector",
"camel.kamelet.aws-ddb-streams-source.accessKey": "XXX",
"camel.kamelet.aws-ddb-streams-source.secretKey": "XXX",
"tasks.max": "1",
"camel.kamelet.aws-ddb-streams-source.uriEndpointOverride": "",
"camel.kamelet.aws-ddb-streams-source.useDefaultCredentialsProvider": "false",
"value.converter": "org.apache.kafka.connect.storage.StringConverter",
"session.timeout.ms": "60000",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"camel.kamelet.aws-ddb-streams-source.region": "XXX",
"camel.kamelet.aws-ddb-streams-source.delay": "5000",
"topics": "ddb-test",
"database.history.kafka.bootstrap.servers": "kafka-headless.kafka-banzai:29092",
"camel.kamelet.aws-ddb-streams-source.streamIteratorType": "FROM_START",
"camel.kamelet.aws-ddb-streams-source.table": "ddb_test",
"name": "ddb-consumer",
"camel.kamelet.aws-ddb-streams-source.overrideEndpoint": "false",
"snapshot.mode": "initial"
}
It looks like you have same problem as in here https://github.com/apache/camel-kafka-connector/issues/1543
The source connector is returning value as InputStream java-type, which cannot be serialized with default converters.
We're looking into the InputStream behavior. @valdar is looking at that.