camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

Problems with CamelDDBStreamSourceConnector 3.20.3

Open thuyhoang-gp opened this issue 2 years ago • 3 comments

Hi, I am struggling to use CamelDDBStreamSourceConnector for DynamoDB change capture. Could you please advice me how to get over them?

  1. I have tried several common converters for Kafka Connect, but only one of them is 'usable'
  • io.confluent.connect.avro.AvroConverter
  • org.apache.kafka.connect.json.JsonConverter
  • org.apache.kafka.connect.converters.ByteArrayConverter
  • org.apache.kafka.connect.storage.StringConverter

The value of message in case of StringConverter, however, is address of InputStreamCache: image

Could you provide me a name of converter I can use with this value.converter?

  1. The topic.key is currently blank. Could you clone it from payload of DDB Stream, for example: ID. image

Again, thanks for the work!

thuyhoang-gp avatar Jul 25 '23 10:07 thuyhoang-gp

Here is my source connector configuration:

{
        "connector.class": "org.apache.camel.kafkaconnector.awsddbstreamssource.CamelAwsddbstreamssourceSourceConnector",
        "camel.kamelet.aws-ddb-streams-source.accessKey": "XXX",
        "camel.kamelet.aws-ddb-streams-source.secretKey": "XXX",
        "tasks.max": "1",
        "camel.kamelet.aws-ddb-streams-source.uriEndpointOverride": "",
        "camel.kamelet.aws-ddb-streams-source.useDefaultCredentialsProvider": "false",
        "value.converter": "org.apache.kafka.connect.storage.StringConverter",
        "session.timeout.ms": "60000",
        "key.converter": "org.apache.kafka.connect.storage.StringConverter",
        "camel.kamelet.aws-ddb-streams-source.region": "XXX",
        "camel.kamelet.aws-ddb-streams-source.delay": "5000",
        "topics": "ddb-test",
        "database.history.kafka.bootstrap.servers": "kafka-headless.kafka-banzai:29092",
        "camel.kamelet.aws-ddb-streams-source.streamIteratorType": "FROM_START",
        "camel.kamelet.aws-ddb-streams-source.table": "ddb_test",
        "name": "ddb-consumer",
        "camel.kamelet.aws-ddb-streams-source.overrideEndpoint": "false",
        "snapshot.mode": "initial"
    }

thuyhoang-gp avatar Jul 25 '23 10:07 thuyhoang-gp

It looks like you have same problem as in here https://github.com/apache/camel-kafka-connector/issues/1543

The source connector is returning value as InputStream java-type, which cannot be serialized with default converters.

jakubmalek avatar Jul 31 '23 07:07 jakubmalek

We're looking into the InputStream behavior. @valdar is looking at that.

oscerd avatar Jul 31 '23 07:07 oscerd