apicurio-registry
apicurio-registry copied to clipboard
Trouble Using Confluent AvroConverter with apicurio registry
Description
Registry Version: 2.5.9.Final Persistence type: in-memory
I'm using Kafka Connect, and I'm trying to use the confluent compatible api from apicurio. I expect it should work with the confluent avro converter.
But I'm getting an error in the confluent library like: The given schema does not match any schema under the subject tst-kpn-des--reid-magic-byte-avro-key; error code: 40403
It's possible their is a bug in the confluent library, but just posting here for some direction.
I have a value and key schema on apicurio with content/global id of 1 and 2.
I was thinking maybe something canonical related is going on..? my next step was to look into the default setting on that within apicurio.
My config:
apiVersion: kafka.strimzi.io/v1beta2
kind: KafkaConnector
metadata:
name: confluent-postgres-sink
labels:
strimzi.io/cluster: my-connect-cluster
spec:
class: io.confluent.connect.jdbc.JdbcSinkConnector
# autoRestart:
# enabled: true
tasksMax: 1
config:
topics: tst-kpn-des--reid-magic-byte-avro
value.converter.schemas.enable: true
key.converter.schemas.enable: true
key.converter: io.confluent.connect.avro.AvroConverter
key.converter.schema.registry.url: myurl.org/apis/ccompat/v7
value.converter: io.confluent.connect.avro.AvroConverter
value.converter.schema.registry.url: myurl.org/apis/ccompat/v7
value.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/my-truststore/truststore.jks
value.converter.schema.registry.ssl.truststore.password: streamin
key.converter.schema.registry.ssl.truststore.location: /opt/kafka/external-configuration/my-truststore/truststore.jks
key.converter.schema.registry.ssl.truststore.password: streamin
connection.url: myconnection
connection.user: reid
connection.password: mypw
auto.create: true
I produce messages onto my topic like:
from confluent_kafka import avro
from confluent_kafka.avro import AvroProducer
import requests
import os
response = requests.get('myserver.org/apis/ccompat/v7/schemas/ids/1/', verify=False)
schema_str = response.json()["schema"]
value_schema = avro.loads(schema_str)
response = requests.get('myserver.org/apis/ccompat/v7/schemas/ids/2/', verify=False)
schema_str = response.json()["schema"]
key_schema = avro.loads(schema_str)
# Get the absolute paths for the SSL certificates
current_dir = os.path.dirname(os.path.realpath(__file__))
ca_cert_path = os.path.join(current_dir, 'certs', 'ca-cert.cert')
cert_path = os.path.join(current_dir, 'certs', 'cert.cert')
key_path = os.path.join(current_dir, 'certs', 'key.key')
# Define Avro producer
avro_producer = AvroProducer(
{
'bootstrap.servers': 'myservers',
'security.protocol': 'ssl',
'ssl.ca.location': ca_cert_path,
'ssl.certificate.location': cert_path,
'ssl.key.location': key_path,
'schema.registry.url': 'myserver.org/apis/ccompat/v7'
},
default_value_schema=value_schema,
default_key_schema=key_schema
)
# Produce Avro message
avro_producer.produce(topic='tst-kpn-des--reid-magic-byte-avro', value={'message': 'hello'}, key={'key': 'some-key'})
avro_producer.flush()
Environment
Kubernetes: v1.26.15 Kafka Connect from Strimzi: 3.7.0 confluent avro converter: 7.6.1 confluent jdbc sink: 10.7.6
Steps to Reproduce
- Create schemas through apicurio ui.
- produce 4 byte magic byte style messages onto the topic
- deploy kafka connect with connector configured as above
Expected vs Actual Behaviour
I expect it to successfully grab the schema. Totally possible I'm doing something very wrong. Hoping for some guidance.
Logs
Caused by: org.apache.kafka.common.errors.SerializationException: Error retrieving Avro key schema version for id 2
at io.confluent.kafka.serializers.AbstractKafkaSchemaSerDe.toKafkaException(AbstractKafkaSchemaSerDe.java:805)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:222)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.deserializeWithSchemaAndVersion(AbstractKafkaAvroDeserializer.java:269)
at io.confluent.connect.avro.AvroConverter$Deserializer.deserialize(AvroConverter.java:199)
at io.confluent.connect.avro.AvroConverter.toConnectData(AvroConverter.java:126)
... 17 more
Caused by: io.confluent.kafka.schemaregistry.client.rest.exceptions.RestClientException: The given schema does not match any schema under the subject tst-kpn-des--reid-magic-byte-avro-key; error code: 40403
at io.confluent.kafka.schemaregistry.client.rest.RestService.sendHttpRequest(RestService.java:336)
at io.confluent.kafka.schemaregistry.client.rest.RestService.httpRequest(RestService.java:409)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:500)
at io.confluent.kafka.schemaregistry.client.rest.RestService.lookUpSubjectVersion(RestService.java:485)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersionFromRegistry(CachedSchemaRegistryClient.java:353)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:609)
at io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient.getVersion(CachedSchemaRegistryClient.java:589)
at io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer.schemaVersion(AbstractKafkaAvroDeserializer.java:204)
... 20 more
2024-06-27 12:06:46 DEBUG <_> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-7) Selecting a single artifact (latest version) by artifactId: null tst-kpn-des--reid-magic-byte-avro-key (behavior = DEFAULT)
2024-06-27 12:06:46 DEBUG <_> [io.apicurio.registry.storage.impl.sql.AbstractSqlRegistryStorage] (executor-thread-7) Selecting artifact (latest version) meta-data: null tst-kpn-des--reid-magic-byte-avro-key (behavior = DEFAULT)
2024-06-27 12:06:46 INFO <_> [io.apicurio.common.apps.logging.audit.AuditLogService] (executor-thread-7) apicurio.audit action="request" result="failure" src_ip="xx.xx" x_forwarded_for="xx.xx" path="/apis/ccompat/v7/subjects/tst-kpn-des--reid-magic-byte-avro-key" response_code="404" method="POST" user="" ```