confluent-kafka-python
confluent-kafka-python copied to clipboard
Unable to parse nested schema or validate nested schema
Description
I have below nested schema which is need to push my message to.
commom.avsc which is register with subject io.test.common.event.EventData
.
{
"namespace": "io.test.common.event",
"type": "record",
"name": "EventData",
"fields":[
{
"name":"id",
"type": "string"
},
{
"name":"type",
"type":"string"
}
]
}
Company.avsc registered with subject io.test.user.schema.Company
{
"namespace": "io.test.user.schema",
"type": "record",
"name": "Company",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": "string"
}
]
}
Payload.avsc register with subject io.test.user.schema.Payload
.
{
"namespace": "io.test.user.schema",
"type": "record",
"name": "Payload",
"fields": [
{
"name": "name",
"type": "string"
},
{
"name": "email",
"type": "string"
},
{
"name":"company",
"type": "io.test.user.schema.Company"
}
]
}
User.avsc registered with <TOPIC_NAME>-value
{
"namespace": "io.test.user.schema",
"type": "record",
"name": "User",
"fields": [
{
"name": "metadata",
"type": "io.test.user.event.EventData"
},
{
"name": "payload",
"type": "io.test.user.schema.Payload"
}
]
}
I have registered them using docker-compose
with above given subject name.
After registration the final <TOPIC_NAME>-value
schema looks like below.
{
"namespace": "io.test.user.schema",
"type": "record",
"name": "User",
"fields": [
{
"name": "metadata",
"type": "io.test.common.event.EventData"
},
{
"name": "payload",
"type": "Payload"
}
],
"references": [
{
"name": "io.test.common.event.EventData",
"subject": "io.test.common.event.EventData",
"version": 5
},
{
"name": "io.test.user.schema.Payload",
"subject": "io.test.user.schema.Payload",
"version": 2
}
]
}
producer.py
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
topic = "user-local-test"
schema_registry_url = "http://localhost:8081"
bootstrap_servers_url = "localhost:9092"
def delivery_report(err, msg):
if err is not None:
print("Delivery failed for User record {}: {}".format(msg.key(), err))
return
print('User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
def main(value):
schema_registry_conf = {'url': schema_registry_url}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
schema = schema_registry_client.get_latest_version(subject_name=topic+"-value")
schema = schema_registry_client.get_schema(schema.schema_id)
schema_str = schema.schema_str
pro_conf ={ "auto.register.schemas": False}
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)
producer_conf = {'bootstrap.servers': bootstrap_servers_url,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer}
producer = SerializingProducer(producer_conf)
producer.poll(0.0)
producer.produce(topic=topic, value=value, on_delivery=delivery_report)
producer.flush()
if __name__ == '__main__':
main(value)
Error:
Traceback (most recent call last): File "producer.py", line 54, in
avro_serializer = AvroSerializer(schema_registry_client, schema_str.schema_str) File "/usr/local/lib/python3.6/site-packages/confluent_kafka/schema_registry/avro.py", line 175, in init parsed_schema = parse_schema(schema_dict) File "fastavro/_schema.pyx", line 106, in fastavro._schema.parse_schema File "fastavro/_schema.pyx", line 245, in fastavro._schema._parse_schema File "fastavro/_schema.pyx", line 290, in fastavro._schema.parse_field File "fastavro/_schema.pyx", line 129, in fastavro._schema._parse_schema fastavro._schema_common.UnknownType: io.test.common.event.EventData
Even if I generate nested schema using below code or manually.
from fastavro.schema import load_schema_ordered
parsed_schema = load_schema_ordered([".commom.avsc", ".Payload.avsc", ".User.avsc",])
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=str(json.dumps(parsed_schema)))
producer.produce(topic=topic, value=value, on_delivery=delivery_report)
producer.flush()
I get below mismatch error:
Traceback (most recent call last): File "/usr/local/lib/python3.6/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce value = self._value_serializer(value, ctx) File "/usr/local/lib/python3.6/site-packages/confluent_kafka/schema_registry/avro.py", line 220, in call registered_schema = self._registry.lookup_schema(subject, File "/usr/local/lib/python3.6/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 412, in lookup_schema response = self._rest_client.post('subjects/{}' File "/usr/local/lib/python3.6/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 127, in post return self.send_request(url, method='POST', body=body) File "/usr/local/lib/python3.6/site-packagesa/schema_registry/schema_registry_client.py", line 174, in send_request raise SchemaRegistryError(response.status_code, confluent_kafka.schema_registry.error.SchemaRegistryError: Schema not found (HTTP status code 404, SR code 40403)
During handling of the above exception, another exception occurred:
Traceback (most recent call last): File "producer.py", line 77, in
main() File "producer.py", line 71, in main producer.produce(topic=topic, value=value, on_delivery=delivery_report) File ""/usr/local/lib/python3.6/site-packages/confluent_kafka/serializing_producer.py", line 174, in produce raise ValueSerializationError(se) confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="Schema not found (HTTP status code 404, SR code 40403)"}
NOTE: I HAVE MULTI LEVEL NESTED SCHEME.
How to reproduce
Register your schema using with respected subjects mentioned against their file name, using docker-compose publish
.
Use below code to fetch and message to your kafka topic.
from confluent_kafka import SerializingProducer
from confluent_kafka.serialization import StringSerializer
from confluent_kafka.schema_registry.avro import AvroSerializer
from confluent_kafka.schema_registry import SchemaRegistryClient
topic = "user-local-test"
schema_registry_url = "http://localhost:8081"
bootstrap_servers_url = "localhost:9092"
def delivery_report(err, msg):
if err is not None:
print("Delivery failed for User record {}: {}".format(msg.key(), err))
return
print('User record {} successfully produced to {} [{}] at offset {}'.format(
msg.key(), msg.topic(), msg.partition(), msg.offset()))
def main(value):
schema_registry_conf = {'url': schema_registry_url}
schema_registry_client = SchemaRegistryClient(schema_registry_conf)
schema = schema_registry_client.get_latest_version(subject_name=topic+"-value")
schema = schema_registry_client.get_schema(schema.schema_id)
schema_str = schema.schema_str
pro_conf ={ "auto.register.schemas": False}
avro_serializer = AvroSerializer(schema_registry_client=schema_registry_client, schema_str=schema_str, conf=pro_conf)
producer_conf = {'bootstrap.servers': bootstrap_servers_url,
'key.serializer': StringSerializer('utf_8'),
'value.serializer': avro_serializer}
producer = SerializingProducer(producer_conf)
producer.poll(0.0)
producer.produce(topic=topic, value=value, on_delivery=delivery_report)
producer.flush()
if __name__ == '__main__':
main(value)
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): verison ('1.7.0', 17235968) and ('1.7.0', 17236223) - [x] Apache Kafka broker version: 6.0.0-ce (org.apache.kafka.common.utils.AppInfoParser)
- [x] Client configuration:
{ "auto.register.schemas": False}
- [x] Operating system: macOS Catalina (10.15.7), python:3.6-alpine(docker)
- [x] Provide client logs (with
'debug': '..'
as necessary) - [x] Provide broker log excerpts
- [x] Critical issue
I don't believe we support referenced schemas for Avro with the Python client yet. Fastavro, which we use, does: https://fastavro.readthedocs.io/en/latest/schema.html#fastavro._schema_py.parse_schema however the Avro serializer doesn't make use of this: https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/schema_registry/avro.py#L194
There is PR that can support this schema object parser in current AvroSerializer
.
#1088 can you merge this and also PR to support single schema object more details in PR #1217.
Need to raise one more PR after this is merged to support nested references schema. So if these can be merged quickly it will solve the problem most of problems.
Client now supports schema references and the PR https://github.com/confluentinc/confluent-kafka-python/pull/1217 mentioned is also merged.
Closing this issue.
Client now supports schema references and the PR #1217 mentioned is also merged.
Closing this issue.
#1217 looks still open.
Sorry, was referring https://github.com/confluentinc/confluent-kafka-python/pull/1088 PR.