kafka-connect-cosmosdb
kafka-connect-cosmosdb copied to clipboard
Source connector fails to write data to topic saying schema is not matching when using avro converters.
Description
-
When trying to create a new instance of source connector with value converter set to io.confluent.connect.avro.AvroConverter. The lease container is created and a new schema is created and sometimes a few documents are written to the topic and then it fails.
-
There is no schema defined before connector instance is created. but still we get the below error.
Kafka connect env settings:
CONNECT_BOOTSTRAP_SERVERS: server-1:39094
CONNECT_GROUP_ID: "kc101-connect"
CONNECT_CONFIG_STORAGE_TOPIC: "_kc101-connect-configs"
CONNECT_OFFSET_STORAGE_TOPIC: "_kc101-connect-offsets"
CONNECT_STATUS_STORAGE_TOPIC: "_kc101-connect-status"
CONNECT_REPLICATION_FACTOR: 1
CONNECT_CONFIG_STORAGE_REPLICATION_FACTOR: 1
CONNECT_OFFSET_STORAGE_REPLICATION_FACTOR: 1
CONNECT_STATUS_STORAGE_REPLICATION_FACTOR: 1
CONNECT_KEY_CONVERTER: "org.apache.kafka.connect.storage.StringConverter"
CONNECT_KEY_CONVERTER_SCHEMA_REGISTRY_URL: schema-registry:8081
CONNECT_KEY_CONVERTER_SCHEMAS_ENABLE: "false"
CONNECT_VALUE_CONVERTER: "io.confluent.connect.avro.AvroConverter"
CONNECT_VALUE_CONVERTER_SCHEMAS_ENABLE: "true"
CONNECT_VALUE_CONVERTER_SCHEMA_REGISTRY_URL: schema-registry:8081
CONNECT_REST_ADVERTISED_HOST_NAME: "connect-1"
CONNECT_LISTENERS: http://connect-1:8083
CONNECT_PLUGIN_PATH: "/usr/share/java,/usr/share/confluent-hub-components"
CONNECT_LOG4J_APPENDER_STDOUT_LAYOUT_CONVERSIONPATTERN: "[%d] %p %X{connector.context}%m (%c:%L)%n"
CONNECT_LOG4J_ROOT_LOGLEVEL: INFO
CONNECT_LOG4J_LOGGERS: 'org.apache.kafka.connect.runtime.rest=WARN,org.reflections=ERROR'
Connect instance settings:
{
"name": "cosmosdb-source-connector-tp-legs-raw",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.storage.StringConverter",
"value.converter": "io.confluent.connect.avro.AvroConverter",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.connection.endpoint": "{{CosmosDbUrl}}",
"connect.cosmos.master.key": "{{CosmosDnApiKey}}",
"connect.cosmos.databasename": "Preplanning",
"connect.cosmos.containers.topicmap": "transport-plan-raw1#transport-plan-raw-events",
"connect.cosmos.offset.useLatest": false,
"value.converter.schemas.enable": "true",
"key.converter.schemas.enable": "false",
"value.converter.schema.registry.url": "http://schema-registry:8081",
"key.converter.schema.registry.url": "http://schema-registry:8081"
}
}
Error Message: org.apache.kafka.common.config.ConfigException: Failed to access Avro data from topic transport-plan-raw1 : Schema being registered is incompatible with an earlier schema for subject "transport-plan-raw1-value", details:
[
{
"errorType": "NAME_MISMATCH",
"description": "The name of the schema has changed (path '/name')",
"additionalInfo": "expected: inferred_name_2092778255"
},
{
"errorType": "NAME_MISMATCH",
"description": "The name of the schema has changed (path '/fields/11/type/name')",
"additionalInfo": "expected: inferred_name__1187958375"
},
{
"errorType": "NAME_MISMATCH",
"description": "The name of the schema has changed (path '/fields/11/type/fields/6/type/name')",
"additionalInfo": "expected: inferred_name_762902212"
},
{
"errorType": "NAME_MISMATCH",
"description": "The name of the schema has changed (path '/fields/11/type/fields/6/type/name')",
"additionalInfo": "expected: inferred_name_762902212"
},
{
"errorType": "NAME_MISMATCH",
"description": "The name of the schema has changed (path '/fields/11/type/fields/6/type/name')",
"additionalInfo": "expected: inferred_name_762902212"
},
{
"errorType": "NAME_MISMATCH",
"description": "The name of the schema has changed (path '/fields/11/type/fields/9/type/name')",
"additionalInfo": "expected: inferred_name_762902212"
},
{
"errorType": "MISSING_UNION_BRANCH",
"description": "The new schema is missing a type inside a union field at path '/fields/11/type/fields/10/type/1' in the old schema",
"additionalInfo": "reader union lacking writer type: ARRAY"
},
{ "oldSchemaVersion": 1 },
{
"oldSchema": "{\"type\":\"record\",\"name\":\"inferred_name_2092778255\",\"fields\":[{\"name\":\"id\",\"type\":\"string\"},{\"name\":\"messageKey\",\"type\":\"string\"},{\"name\":\"shipmentNumber\",\"type\":\"string\"},{\"name\":\"shipmentIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentVersionIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentVersionCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentVersionUpdatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentStructureUpdateDatetime\",\"type\":\"string\"},{\"name\":\"containerServiceTypeLoadService\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_586916356\",\"fields\":[{\"name\":\"containerServiceTypeLoadService\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_586916356\"}},{\"name\":\"containerServiceTypeDischargeService\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1295212612\",\"fields\":[{\"name\":\"containerServiceTypeDischargeService\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_1295212612\"}},{\"name\":\"cargoServiceType\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1290169462\",\"fields\":[{\"name\":\"cargoServiceType\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_1290169462\"}},{\"name\":\"shipmentRoute\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1187958375\",\"fields\":[{\"name\":\"shipmentRouteIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentRouteTypeCode\",\"type\":\"string\"},{\"name\":\"shipmentRouteTypeName\",\"type\":\"string\"},{\"name\":\"tradeLaneName\",\"type\":\"string\"},{\"name\":\"shipmentRouteCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRouteUpdatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointPlaceOfReceipt\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_762902212\",\"fields\":[{\"name\":\"routePointIdentifier\",\"type\":\"string\"},{\"name\":\"site\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1587501840\",\"fields\":[{\"name\":\"siteIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"},{\"name\":\"definedAreaRelationshipTypeCodeParentArea\",\"type\":\"string\"},{\"name\":\"operationalFacility\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1582750289\",\"fields\":[{\"name\":\"operationalFacilityIdentifier\",\"type\":\"string\"},{\"name\":\"facilityCode\",\"type\":\"string\"},{\"name\":\"facilityName\",\"type\":\"string\"},{\"name\":\"facilityType\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1888608854\",\"fields\":[{\"name\":\"facilityTypeCode\",\"type\":\"string\"},{\"name\":\"isChildOf\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1616517423\",\"fields\":[{\"name\":\"facilityTypeCode\",\"type\":\"string\"},{\"name\":\"isChildOfL1\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__2136641715\",\"fields\":[{\"name\":\"facilityTypeCode\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__2136641715\"}}],\"connect.name\":\"inferred_name__1616517423\"}}],\"connect.name\":\"inferred_name_1888608854\"}}],\"connect.name\":\"inferred_name__1582750289\"}}],\"connect.name\":\"inferred_name_1587501840\"}},{\"name\":\"city\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__666696926\",\"fields\":[{\"name\":\"cityIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"},{\"name\":\"definedAreaRelationshipTypeCodeParentArea\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__666696926\"}},{\"name\":\"territorialUnit\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__275936916\",\"fields\":[{\"name\":\"territorialUnitIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"},{\"name\":\"isoSubdivisionCategory\",\"type\":\"string\"},{\"name\":\"definedAreaRelationshipTypeCodeParentArea\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__275936916\"}},{\"name\":\"country\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_206982570\",\"fields\":[{\"name\":\"countryIdentifier\",\"type\":\"string\"},{\"name\":\"definedAreaCode\",\"type\":\"string\"},{\"name\":\"definedAreaName\",\"type\":\"string\"},{\"name\":\"definedAreaTypeCode\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_206982570\"}},{\"name\":\"locationFunctionCode\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointUpdatedDatetime\",\"type\":\"string\"}],\"connect.name\":\"inferred_name_762902212\"}},{\"name\":\"shipmentRoutePointFirstVesselLoad\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRoutePointFinalVesselDischarge\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRoutePointPlaceOfDelivery\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRouteLinks\",\"type\":[\"null\",{\"type\":\"array\",\"items\":{\"type\":\"record\",\"name\":\"inferred_name__1469213405\",\"fields\":[{\"name\":\"shipmentRouteLinkIdentifier\",\"type\":\"string\"},{\"name\":\"shipmentRoutePointStartingPoint\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRoutePointEndingPoint\",\"type\":\"inferred_name_762902212\"},{\"name\":\"shipmentRouteLinkSequenceNumber\",\"type\":\"string\"},{\"name\":\"routeLinkProductIdentifier\",\"type\":\"long\"},{\"name\":\"shipmentRoutingType\",\"type\":\"string\"},{\"name\":\"transportModeCode\",\"type\":\"string\"},{\"name\":\"transportModeName\",\"type\":\"string\"},{\"name\":\"waterAirLand\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfDepartureLocal\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfDeparture\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfArrivalLocal\",\"type\":\"string\"},{\"name\":\"estimatedTimeOfArrival\",\"type\":\"string\"},{\"name\":\"voyageNumberStartsAt\",\"type\":\"string\"},{\"name\":\"voyageNumberEndsAt\",\"type\":\"string\"},{\"name\":\"vesselSchedule\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name_1720401109\",\"fields\":[{\"name\":\"vesselCode\",\"type\":\"string\"},{\"name\":\"vesselName\",\"type\":\"string\"},{\"name\":\"alternativeCodes\",\"type\":{\"type\":\"record\",\"name\":\"inferred_name__1505900296\",\"fields\":[{\"name\":\"rksT_CARRIER_ID\",\"type\":\"string\"},{\"name\":\"rksT_CARRIER_NAME\",\"type\":\"string\"},{\"name\":\"rksT_SERVICE_CODE\",\"type\":\"string\"},{\"name\":\"rksT_SERVICE_NAME\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__1505900296\"}}],\"connect.name\":\"inferred_name_1720401109\"}},{\"name\":\"shipmentRouteLinkCreatedDatetime\",\"type\":\"string\"},{\"name\":\"shipmentRouteLinkUpdatedDatetime\",\"type\":\"string\"}],\"connect.name\":\"inferred_name__1469213405\"},\"connect.name\":\"inferred_name_726885652\"}],\"default\":null}],\"connect.name\":\"inferred_name__1187958375\"}},{\"name\":\"_rid\",\"type\":\"string\"},{\"name\":\"_self\",\"type\":\"string\"},{\"name\":\"_etag\",\"type\":\"string\"},{\"name\":\"_attachments\",\"type\":\"string\"},{\"name\":\"_ts\",\"type\":\"long\"},{\"name\":\"_lsn\",\"type\":\"long\"}],\"connect.name\":\"inferred_name_2092778255\"}\"}, {compatibility: \"BACKWARD"
}
]
Error code: 409
Expected Behavior
- Connector instance should start pushing data into topioc
Reproduce
- Go to '...'
- Click on '....'
- Scroll down to '....'
- See error
Additional Context
- (If applicable: Add any other context about the problem here; for example: proposed solution, doc changes, screenshots, logs, links, etc)
@TheovanKraay can you please take a look at this issue, thanks!
@sujit-warrier-maersk - we will revert back soon. Because of the holiday season, it might take a week. But wanted to let you know that we have acknowledged this issue and will work on it, thanks!
@sujit-warrier-maersk do you need to use AvroConverter
? Typically JsonConverter
is used, per sample in documentation for source connector.
{
"name": "cosmosdb-source-connector",
"config": {
"connector.class": "com.azure.cosmos.kafka.connect.source.CosmosDBSourceConnector",
"tasks.max": "1",
"key.converter": "org.apache.kafka.connect.json.JsonConverter",
"value.converter": "org.apache.kafka.connect.json.JsonConverter",
"connect.cosmos.task.poll.interval": "100",
"connect.cosmos.connection.endpoint": "https://<cosmosinstance-name>.documents.azure.com:443/",
"connect.cosmos.master.key": "<cosmosdbprimarykey>",
"connect.cosmos.databasename": "kafkaconnect",
"connect.cosmos.containers.topicmap": "apparels#kafka",
"connect.cosmos.offset.useLatest": false,
"value.converter.schemas.enable": "false",
"key.converter.schemas.enable": "false"
}
}