confluent-kafka-python
confluent-kafka-python copied to clipboard
Schema registry API response don't match docs for schemas with references
Description
This issue occurs when a schema is registered with schema references. Several of the schema registry APIs return schemas that do not correctly construct their list of references.
The SchemaRegistryClient get_schema() method](https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/schema_registry/schema_registry_client.py#L372) returns a Schema object with a correctly built list of references. Each item is a SchemaReference object per the documentation.
The lookup_schema(), get_latest_version(), and get_version() methods return a RegisteredSchema. Per the documentation, the schema property is a Schema object. However, the Schema object's list of references is a list of dictionaries rather than a list of SchemaReference objects.
The RegisteredSchema.schema.references property should return a list of SchemaReference objects to be consistent with the Schema returned by the get_schema() method and match the documentation.
How to reproduce
If you run the following script, it will create two schemas. The second schema will reference the first. It will then call all 4 APIs that return Schema objects or RegisteredSchema objects that have a Schema property. The script will show that the references in the Schema from 3 of the APIs are dictionaries instead of SchemaReference objects.
from confluent_kafka.schema_registry import (
SchemaRegistryClient,
Schema,
SchemaReference,
)
schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})
def register_schemas():
role_schema_str = """
{
"name": "Role",
"type": "record",
"fields": [
{"name": "title", "type": "string"},
{"name": "department", "type": "string"}
]
}
"""
role_schema = Schema(schema_str=role_schema_str, schema_type="AVRO")
schema_registry_client.register_schema(subject_name="role", schema=role_schema)
employee_schema_str = """
{
"name": "Employee",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "role", "type": "Role"}
]
}
"""
references = [
SchemaReference(name="Role", subject="role", version=1),
]
employee_schema = Schema(
schema_str=employee_schema_str,
schema_type="AVRO",
references=references,
)
schema_registry_client.register_schema(
subject_name="employee",
schema=employee_schema,
)
def apis():
registered_employee_schema = schema_registry_client.get_latest_version("employee")
print(
f"get_latest_version() returned: {registered_employee_schema.schema.references}"
)
schema_id = registered_employee_schema.schema_id
schema = schema_registry_client.get_schema(schema_id)
print(f"get_schema() returned: {schema.references}")
registered_schema = schema_registry_client.get_version("employee", 1)
print(f"get_version() returned: {registered_schema.schema.references}")
employee_schema_str = """
{
"name": "Employee",
"type": "record",
"fields": [
{"name": "name", "type": "string"},
{"name": "role", "type": "Role"}
]
}
"""
references = [
SchemaReference(name="Role", subject="role", version=1),
]
employee_schema = Schema(
schema_str=employee_schema_str,
schema_type="AVRO",
references=references,
)
registered_schema = schema_registry_client.lookup_schema(
"employee", employee_schema
)
print(f"lookup_schema() returned: {registered_schema.schema.references}")
if __name__ == "__main__":
register_schemas()
apis()
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): confluent-kafka-python = ('1.8.2', 17302016), librdkafka = ('1.8.2', 17302271) - [x] Apache Kafka broker version: 6.1.0
- [ ] Client configuration:
{...} - [x] Operating system: MacOS
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
Thanks for reporting this and appreciate for the PR, we will review it. But please note that the schema references is not supported yet.
combining with #1302