camel-kafka-connector icon indicating copy to clipboard operation
camel-kafka-connector copied to clipboard

camel-salesforce-kafka-connector-0.11.0 : Error while publish Avro message

Open Ziauddin135 opened this issue 3 years ago • 0 comments

Hi Everyone,

Please help. I am using Camel Salesforce to kafka soruce connector I am facing issue while consuming data from Push topic and publishing it to Kafka in AVRO format. below is the message and error.

========================payload:-==================== {"Type":"Customer - Channel","Phone":"(799) 937-6773","Website":"dickenson-consulting.com","Id":"0015j000009CEd7AAG","Name":"Dickenson plc Inc"}

=========== connector configuration=================================

key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=io.confluent.connect.avro.AvroConverter
key.converter.schemas.enable=false
value.converter.schemas.enable=false
offset.storage.file.filename=/tmp/connect.offsets
offset.flush.interval.ms=10000
name=local-camel-sf-source
connector.class=org.apache.camel.kafkaconnector.salesforce.CamelSalesforceSourceConnector
camel.source.path.topicName=/topic/AccountPush1
camel.component.salesforce.clientId=<client-id>
camel.component.salesforce.clientSecret=<client-secret>
camel.component.salesforce.instanceUrl=<instance-url>
camel.component.salesforce.loginUrl=<login-url>
[email protected]
camel.component.salesforce.password=pwdsftoekn
camel.component.salesforce.authenticationType=USERNAME_PASSWORD
camel.source.endpoint.backoffIncrement=5000

camel.source.endpoint.replayId = -2
errors.retry.timeout=-1
tasks.max=1
topics=avrotest1234
camel.source.endpoint.rawPayload=true

===============Error:- ========================================= Caused by: org.apache.kafka.connect.errors.DataException: Failed to serialize Avro data from topic avrotest1234 : at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:87) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:321) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) ... 11 more Caused by: org.apache.kafka.common.errors.SerializationException: Error registering Avro schema: "string" Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: Schema being registered is incompatible with an earlier schema for subject "avrotest1234-value"; error code: 409 at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:293) at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:352) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:495) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:486) at io.confluent.kafka.schemaregistry.client.rest.RestService.registerSchema(RestService.java:459) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.registerAndGetId(CachedSchemaRegistryClient.java:206) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:268) at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.register(CachedSchemaRegistryClient.java:244) at io.confluent.kafka.serializers.AbstractKafkaAvroSerializer.serializeImpl(AbstractKafkaAvroSerializer.java:74) at io.confluent.connect.avro.AvroConverter$Serializer.serialize(AvroConverter.java:138) at io.confluent.connect.avro.AvroConverter.fromConnectData(AvroConverter.java:84) at org.apache.kafka.connect.storage.Converter.fromConnectData(Converter.java:63) at org.apache.kafka.connect.runtime.WorkerSourceTask.lambda$convertTransformedRecord$3(WorkerSourceTask.java:321) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndRetry(RetryWithToleranceOperator.java:156) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execAndHandleError(RetryWithToleranceOperator.java:190) at org.apache.kafka.connect.runtime.errors.RetryWithToleranceOperator.execute(RetryWithToleranceOperator.java:132) at org.apache.kafka.connect.runtime.WorkerSourceTask.convertTransformedRecord(WorkerSourceTask.java:321) at org.apache.kafka.connect.runtime.WorkerSourceTask.sendRecords(WorkerSourceTask.java:347) at org.apache.kafka.connect.runtime.WorkerSourceTask.execute(WorkerSourceTask.java:261) at org.apache.kafka.connect.runtime.WorkerTask.doRun(WorkerTask.java:188) at org.apache.kafka.connect.runtime.WorkerTask.run(WorkerTask.java:237) at java.util.concurrent.Executors$RunnableAdapter.call(Unknown Source) at java.util.concurrent.FutureTask.run(Unknown Source) at java.util.concurrent.ThreadPoolExecutor.runWorker(Unknown Source) at java.util.concurrent.ThreadPoolExecutor$Worker.run(Unknown Source) at java.lang.Thread.run(Unknown Source)

================================================================================

what i observe is it is registering this schema in schema registry : - =============actual schema it is registering============== {"subject":"avrotest1234-value","version":2,"id":100013,"schema":"\"string\""}

while i expect it to validate the payload at the top against below schema ================== Expected schema i want to validate against======================

{"type":"record",
"name":"Account",
"namespace":"com.camel.salesforce",
"fields":[
{"default": null, "name":"Type", "type": [ "null", "string"] }, 
{ "default": null, "name":"Phone", "type": [ "null", "string"] },
 { "default": null, "name":"Website", "type": [ "null", "string"] },
 { "default": null, "name":"Id", "type": [ "null", "string"] }, 
{ "default": null, "name":"Name", "type": [ "null", "string"] } 
] }

Please help

Ziauddin135 avatar Feb 07 '22 23:02 Ziauddin135