camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

Does CamelAwsddbsinkSinkConnector work with AVRO data?

Open 10elements opened this issue 6 months ago • 1 comments

I'm trying to use the CamelAwsddbsinkSinkConnector v3.20.3 in MSK Connect(v2.7.1) to load AVRO messages from a kafka topic into a Dynamodb table, the AVRO schema is stored in a confluent schema registry. I noticed no matter what I tried, the connector seemed to failed to find the correct fields in the message after the deserialization and hence failed to put the message into the Dynamodb table.

This is my connector config:

{
  "connector.class": "org.apache.camel.kafkaconnector.awsddbsink.CamelAwsddbsinkSinkConnector",
  "transforms.tojson.type": "org.apache.camel.kafkaconnector.transforms.SchemaAndStructToJsonTransform",
  "transforms.tojson.converter.type": "value",
  "topics": "test_orders",
  "tasks.max": "1",
  "camel.kamelet.aws-ddb-sink.useDefaultCredentialsProvider": "true",
  "transforms": "tojson",
  "camel.sink.contentLogLevel": "DEBUG",
  "value.converter.schema.registry.url": "https://svcs--schemaregistry.euc1.prvw.ktdev.io",
  "camel.kamelet.aws-ddb-sink.operation": "PutItem",
  "camel.kamelet.aws-ddb-sink.table": "etl--prvw--euc1--test-orders",
  "value.converter": "io.confluent.connect.avro.AvroConverter",
  "camel.kamelet.aws-ddb-sink.region": "eu-central-1",
  "key.converter": "org.apache.kafka.connect.storage.StringConverter"
}

I read from https://camel.apache.org/camel-kamelets/4.10.x/aws-ddb-sink.html#_expected_data_format_for_sink that the expected input data should be a JSON, so I tried to add the SchemaAndStructToJsonTransform SMT that I found out from this issue after the Avro Converter to convert the kafka connect struct after the deserialization into a JSON before passing it to the sink connector, but still, the connector failed with an error like this:


[Worker-0c1825974767d8637] software.amazon.awssdk.services.dynamodb.model.DynamoDbException: One or more parameter values were invalid: Missing the key order_id in the item (Service: DynamoDb, Status Code: 400, Request ID: V2V0KK8U36SVSQ8EE9GSK75ARJVV4KQNSO5AEMVJF66Q9ASUAAJG)
--
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleErrorResponse(CombinedResponseHandler.java:125)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handleResponse(CombinedResponseHandler.java:82)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:60)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.CombinedResponseHandler.handle(CombinedResponseHandler.java:41)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:40)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.HandleResponseStage.execute(HandleResponseStage.java:30)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:73)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptTimeoutTrackingStage.execute(ApiCallAttemptTimeoutTrackingStage.java:42)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:78)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.TimeoutExceptionHandlingStage.execute(TimeoutExceptionHandlingStage.java:40)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:50)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallAttemptMetricCollectionStage.execute(ApiCallAttemptMetricCollectionStage.java:36)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:81)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.RetryableStage.execute(RetryableStage.java:36)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:56)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.StreamManagingStage.execute(StreamManagingStage.java:36)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.executeWithTimer(ApiCallTimeoutTrackingStage.java:80)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:60)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallTimeoutTrackingStage.execute(ApiCallTimeoutTrackingStage.java:42)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:48)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ApiCallMetricCollectionStage.execute(ApiCallMetricCollectionStage.java:31)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.RequestPipelineBuilder$ComposingRequestPipelineStage.execute(RequestPipelineBuilder.java:206)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:37)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.pipeline.stages.ExecutionFailureExceptionReportingStage.execute(ExecutionFailureExceptionReportingStage.java:26)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.http.AmazonSyncHttpClient$RequestExecutionBuilderImpl.execute(AmazonSyncHttpClient.java:193)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.invoke(BaseSyncClientHandler.java:103)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.doExecute(BaseSyncClientHandler.java:171)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.lambda$execute$1(BaseSyncClientHandler.java:82)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.measureApiCallSuccess(BaseSyncClientHandler.java:179)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.internal.handler.BaseSyncClientHandler.execute(BaseSyncClientHandler.java:76)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.core.client.handler.SdkSyncClientHandler.execute(SdkSyncClientHandler.java:45)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.awscore.client.handler.AwsSyncClientHandler.execute(AwsSyncClientHandler.java:56)
  | [Worker-0c1825974767d8637] at software.amazon.awssdk.services.dynamodb.DefaultDynamoDbClient.putItem(DefaultDynamoDbClient.java:4243)
  | [Worker-0c1825974767d8637] at org.apache.camel.component.aws2.ddb.PutItemCommand.execute(PutItemCommand.java:32)
  | [Worker-0c1825974767d8637] at org.apache.camel.component.aws2.ddb.Ddb2Producer.process(Ddb2Producer.java:55)
  | [Worker-0c1825974767d8637] at org.apache.camel.support.AsyncProcessorConverterHelper$ProcessorToAsyncProcessorBridge.process(AsyncProcessorConverterHelper.java:66)
  | [Worker-0c1825974767d8637] at org.apache.camel.processor.SendProcessor.process(SendProcessor.java:172)
  | [Worker-0c1825974767d8637] at org.apache.camel.processor.errorhandler.RedeliveryErrorHandler$SimpleTask.run(RedeliveryErrorHandler.java:477)
  | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultReactiveExecutor$Worker.schedule(DefaultReactiveExecutor.java:181)
  | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultReactiveExecutor.scheduleMain(DefaultReactiveExecutor.java:59)
  | [Worker-0c1825974767d8637] at org.apache.camel.processor.Pipeline.process(Pipeline.java:165)
  | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.CamelInternalProcessor.process(CamelInternalProcessor.java:392)
  | [Worker-0c1825974767d8637] at org.apache.camel.component.direct.DirectProducer.process(DirectProducer.java:96)
  | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:214)
  | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.SharedCamelInternalProcessor$1.process(SharedCamelInternalProcessor.java:111)
  | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultAsyncProcessorAwaitManager.process(DefaultAsyncProcessorAwaitManager.java:83)
  | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.SharedCamelInternalProcessor.process(SharedCamelInternalProcessor.java:108)
  | [Worker-0c1825974767d8637] at org.apache.camel.support.cache.DefaultProducerCache.send(DefaultProducerCache.java:199)
  | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:176)
  | [Worker-0c1825974767d8637] at org.apache.camel.impl.engine.DefaultProducerTemplate.send(DefaultProducerTemplate.java:148)
  | [Worker-0c1825974767d8637] at org.apache.camel.kafkaconnector.CamelSinkTask.put(CamelSinkTask.java:205)
  | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerSinkTask.deliverMessages(WorkerSinkTask.java:587)
  | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerSinkTask.poll(WorkerSinkTask.java:330)
  | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerSinkTask.iteration(WorkerSinkTask.java:232)
  | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerSinkTask.execute(WorkerSinkTask.java:201)
  | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:191)
  | [Worker-0c1825974767d8637] at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:240)
  | [Worker-0c1825974767d8637] at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
  | [Worker-0c1825974767d8637] at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
  | [Worker-0c1825974767d8637] at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
  | [Worker-0c1825974767d8637] at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
  | [Worker-0c1825974767d8637] at java.base/java.lang.Thread.run(Thread.java:829)

So to me it looks like the converted JSON is malformed and don't have the correct field that's supposed to exist, I then consumed the topic and deserialized and print out the message in a separate consumer, and I can confirm that the AVRO messages are correct and have all the required fields such as order_id:

2025-06-10 15:00:46,632 - __main__ - INFO - Connected to Schema Registry at http://localhost:8081
2025-06-10 15:00:47,154 - httpx - INFO - HTTP Request: GET http://localhost:8081/subjects/test_orders-value/versions/latest "HTTP/1.1 200 OK"
2025-06-10 15:00:47,156 - __main__ - INFO - Retrieved schema for subject 'test_orders-value', version 1 from registry.
2025-06-10 15:00:47,240 - __main__ - INFO - Subscribed to topic: test_orders
2025-06-10 15:00:47,240 - __main__ - INFO - Starting to consume messages. Press Ctrl+C to stop.
2025-06-10 15:00:47,285 - botocore.credentials - INFO - Found credentials in shared credentials file: ~/.aws/credentials
2025-06-10 15:00:55,138 - httpx - INFO - HTTP Request: GET http://localhost:8081/schemas/ids/399 "HTTP/1.1 200 OK"
2025-06-10 15:00:55,140 - __main__ - INFO - Message 1: Partition 0, Offset 0
{'order_id': 256795, 'ordered_at': '2025-05-22T01:11:12.953333', 'product_id': 3351, 'quantity': 9, 'customer_id': 99025, 'customer_name': 'Metro Shipping Co'}

Can someone confirm if the CamelAwsddbsinkSinkConnector is even able to work with AVRO data? If it does, am I missing anything in my connector config in order to get it work correctly?

P.S I can't use the latest version of CamelAwsddbsinkSinkConnector because I'm using MSK connect which is still on 2.7.1 and uses java 11.

10elements avatar Jun 10 '25 22:06 10elements

Can you try to use a local kafka with a newer version of the connector? 3.20.4 is unsupported.

oscerd avatar Jun 13 '25 08:06 oscerd

@10elements any luck with this one?

ilyas-tp avatar Jul 17 '25 21:07 ilyas-tp