confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Unable to parse nested schema or validate nested schema

Open avanish-appdirect opened this issue 3 years ago • 2 comments

Description

I have below nested schema which is need to push my message to. commom.avsc which is register with subject io.test.common.event.EventData.

{
    "namespace": "io.test.common.event",
    "type": "record",
    "name": "EventData",
    "fields":[
        {
            "name":"id",
            "type": "string"
        },
        {
            "name":"type",
            "type":"string"
        }
    ]
}

Company.avsc registered with subject io.test.user.schema.Company

{
    "namespace": "io.test.user.schema",
    "type": "record",
    "name": "Company",
    "fields": [
        {
            "name": "name",
            "type": "string"
        },
        {
            "name": "email",
            "type": "string"
        }
    ]
}

Payload.avsc register with subject io.test.user.schema.Payload.

{
    "namespace": "io.test.user.schema",
    "type": "record",
    "name": "Payload",
    "fields": [
        {
            "name": "name",
            "type": "string"
        },
        {
            "name": "email",
            "type": "string"
        },
        {
            "name":"company",
            "type": "io.test.user.schema.Company"
        }
    ]
}

User.avsc registered with <TOPIC_NAME>-value

{
    "namespace": "io.test.user.schema",
    "type": "record",
    "name": "User",
    "fields": [
        {
            "name": "metadata",
            "type": "io.test.user.event.EventData"
        },
        {
            "name": "payload",
            "type": "io.test.user.schema.Payload"
        }
    ]
}

I have registered them using docker-compose with above given subject name. After registration the final <TOPIC_NAME>-value schema looks like below.

{
    "namespace": "io.test.user.schema",
    "type": "record",
    "name": "User",
    "fields": [
        {
            "name": "metadata",
            "type": "io.test.common.event.EventData"
        },
        {
            "name": "payload",
            "type": "Payload"
        }
    ],
"references": [
    {
      "name": "io.test.common.event.EventData",
      "subject": "io.test.common.event.EventData",
      "version": 5
    },
    {
      "name": "io.test.user.schema.Payload",
      "subject": "io.test.user.schema.Payload",
      "version": 2
    }
  ]
}

producer.py

from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient

topic = "user-local-test"
schema_registry_url = "http://localhost:8081"
bootstrap_servers_url = "localhost:9092"

def delivery_report(err, msg):
    if err is not None:
        print("Delivery failed for User record {}: {}".format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))


def main(value):
    schema_registry_conf = {'url': schema_registry_url}
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)
    schema = schema_registry_client.get_latest_version(subject_name=topic+"-value")
    schema = schema_registry_client.get_schema(schema.schema_id)
    schema_str = schema.schema_str

    pro_conf ={ "auto.register.schemas": False}
    avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)
  
    producer_conf = {'bootstrap.servers': bootstrap_servers_url,
                     'key.serializer': StringSerializer('utf_8'),
                     'value.serializer': avro_serializer}
    producer = SerializingProducer(producer_conf)

    producer.poll(0.0)
    producer.produce(topic=topic, value=value, on_delivery=delivery_report)
    producer.flush()

if __name__ == '__main__':
    main(value)

Error:

Traceback (most recent call last): File "producer.py", line 54, in avro_serializer = AvroSerializer(schema_registry_client, schema_str.schema_str) File "/usr/local/lib/python3.6/site-packages/confluent_kafka/schema_registry/avro.py", line 175, in init parsed_schema = parse_schema(schema_dict) File "fastavro/_schema.pyx", line 106, in fastavro._schema.parse_schema File "fastavro/_schema.pyx", line 245, in fastavro._schema._parse_schema File "fastavro/_schema.pyx", line 290, in fastavro._schema.parse_field File "fastavro/_schema.pyx", line 129, in fastavro._schema._parse_schema fastavro._schema_common.UnknownType: io.test.common.event.EventData

Even if I generate nested schema using below code or manually.

from fastavro.schema import load_schema_ordered
parsed_schema = load_schema_ordered([".commom.avsc", ".Payload.avsc", ".User.avsc",])
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=str(json.dumps(parsed_schema)))

producer.produce(topic=topic, value=value, on_delivery=delivery_report)
producer.flush()

I get below mismatch error:

Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce value = self._value_serializer(value, ctx) File "/usr/local/lib/python3.6/site-packages/confluent_kafka/schema_registry/avro.py", line 220, in call registered_schema = self._registry.lookup_schema(subject, File "/usr/local/lib/python3.6/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 412, in lookup_schema response = self._rest_client.post('subjects/{}' File "/usr/local/lib/python3.6/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 127, in post return self.send_request(url, method='POST', body=body) File "/usr/local/lib/python3.6/site-packagesa/schema_registry/schema_registry_client.py", line 174, in send_request raise SchemaRegistryError(response.status_code, confluent_kafka.schema_registry.error.SchemaRegistryError: Schema not found (HTTP status code 404, SR code 40403)

During handling of the above exception, another exception occurred:

Traceback (most recent call last): File "producer.py", line 77, in main() File "producer.py", line 71, in main producer.produce(topic=topic, value=value, on_delivery=delivery_report) File ""/usr/local/lib/python3.6/site-packages/confluent_kafka/serializing_producer.py", line 174, in produce raise ValueSerializationError(se) confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Schema not found (HTTP status code 404, SR code 40403)"}

NOTE: I HAVE MULTI LEVEL NESTED SCHEME.

How to reproduce

Register your schema using with respected subjects mentioned against their file name, using docker-compose publish. Use below code to fetch and message to your kafka topic.

from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient

topic = "user-local-test"
schema_registry_url = "http://localhost:8081"
bootstrap_servers_url = "localhost:9092"

def delivery_report(err, msg):
    if err is not None:
        print("Delivery failed for User record {}: {}".format(msg.key(), err))
        return
    print('User record {} successfully produced to {} [{}] at offset {}'.format(
        msg.key(), msg.topic(), msg.partition(), msg.offset()))


def main(value):
    schema_registry_conf = {'url': schema_registry_url}
    schema_registry_client = SchemaRegistryClient(schema_registry_conf)
    schema = schema_registry_client.get_latest_version(subject_name=topic+"-value")
    schema = schema_registry_client.get_schema(schema.schema_id)
    schema_str = schema.schema_str

    pro_conf ={ "auto.register.schemas": False} 
    avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)
   
    producer_conf = {'bootstrap.servers': bootstrap_servers_url,
                     'key.serializer': StringSerializer('utf_8'),
                     'value.serializer': avro_serializer}
    producer = SerializingProducer(producer_conf)

    producer.poll(0.0)
    producer.produce(topic=topic, value=value, on_delivery=delivery_report)
    producer.flush()

if __name__ == '__main__':
    main(value)

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): verison ('1.7.0', 17235968) and ('1.7.0', 17236223)
  • [x] Apache Kafka broker version: 6.0.0-ce (org.apache.kafka.common.utils.AppInfoParser)
  • [x] Client configuration: { "auto.register.schemas": False}
  • [x] Operating system: macOS Catalina (10.15.7), python:3.6-alpine(docker)
  • [x] Provide client logs (with 'debug': '..' as necessary)
  • [x] Provide broker log excerpts
  • [x] Critical issue

avanish-appdirect avatar Sep 27 '21 07:09 avanish-appdirect

I don't believe we support referenced schemas for Avro with the Python client yet. Fastavro, which we use, does: https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.parse_schema however the Avro serializer doesn't make use of this: https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/schema_registry/avro.py#L194

mhowlett avatar Oct 04 '21 16:10 mhowlett

There is PR that can support this schema object parser in current AvroSerializer. #1088 can you merge this and also PR to support single schema object more details in PR #1217.

Need to raise one more PR after this is merged to support nested references schema. So if these can be merged quickly it will solve the problem most of problems.

avanish-appdirect avatar Oct 06 '21 16:10 avanish-appdirect

Client now supports schema references and the PR https://github.com/confluentinc/confluent-kafka-python/pull/1217 mentioned is also merged.

Closing this issue.

pranavrth avatar Feb 27 '24 13:02 pranavrth

Client now supports schema references and the PR #1217 mentioned is also merged.

Closing this issue.

#1217 looks still open.

avanish-appdirect avatar Feb 28 '24 05:02 avanish-appdirect

Sorry, was referring https://github.com/confluentinc/confluent-kafka-python/pull/1088 PR.

pranavrth avatar Mar 06 '24 13:03 pranavrth