confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Schema registry API response don't match docs for schemas with references

Open RickTalken opened this issue 3 years ago • 2 comments

Description

This issue occurs when a schema is registered with schema references. Several of the schema registry APIs return schemas that do not correctly construct their list of references.

The SchemaRegistryClient get_schema() method](https://github.com/confluentinc/confluent-kafka-python/blob/master/src/confluent_kafka/schema_registry/schema_registry_client.py#L372) returns a Schema object with a correctly built list of references. Each item is a SchemaReference object per the documentation.

The lookup_schema(), get_latest_version(), and get_version() methods return a RegisteredSchema. Per the documentation, the schema property is a Schema object. However, the Schema object's list of references is a list of dictionaries rather than a list of SchemaReference objects.

The RegisteredSchema.schema.references property should return a list of SchemaReference objects to be consistent with the Schema returned by the get_schema() method and match the documentation.

How to reproduce

If you run the following script, it will create two schemas. The second schema will reference the first. It will then call all 4 APIs that return Schema objects or RegisteredSchema objects that have a Schema property. The script will show that the references in the Schema from 3 of the APIs are dictionaries instead of SchemaReference objects.

from confluent_kafka.schema_registry import (
    SchemaRegistryClient,
    Schema,
    SchemaReference,
)

schema_registry_client = SchemaRegistryClient({"url": "http://localhost:8081"})


def register_schemas():
    role_schema_str = """
    {
        "name": "Role",
        "type": "record",
        "fields": [
            {"name": "title", "type": "string"},
            {"name": "department", "type": "string"}
        ]
    }
    """
    role_schema = Schema(schema_str=role_schema_str, schema_type="AVRO")
    schema_registry_client.register_schema(subject_name="role", schema=role_schema)
    employee_schema_str = """
    {
        "name": "Employee",
        "type": "record",
        "fields": [
            {"name": "name", "type": "string"},
            {"name": "role", "type": "Role"}
        ]
    }
    """
    references = [
        SchemaReference(name="Role", subject="role", version=1),
    ]
    employee_schema = Schema(
        schema_str=employee_schema_str,
        schema_type="AVRO",
        references=references,
    )
    schema_registry_client.register_schema(
        subject_name="employee",
        schema=employee_schema,
    )


def apis():
    registered_employee_schema = schema_registry_client.get_latest_version("employee")
    print(
        f"get_latest_version() returned: {registered_employee_schema.schema.references}"
    )

    schema_id = registered_employee_schema.schema_id
    schema = schema_registry_client.get_schema(schema_id)
    print(f"get_schema() returned: {schema.references}")

    registered_schema = schema_registry_client.get_version("employee", 1)
    print(f"get_version() returned: {registered_schema.schema.references}")

    employee_schema_str = """
        {
            "name": "Employee",
            "type": "record",
            "fields": [
                {"name": "name", "type": "string"},
                {"name": "role", "type": "Role"}
            ]
        }
        """
    references = [
        SchemaReference(name="Role", subject="role", version=1),
    ]
    employee_schema = Schema(
        schema_str=employee_schema_str,
        schema_type="AVRO",
        references=references,
    )
    registered_schema = schema_registry_client.lookup_schema(
        "employee", employee_schema
    )
    print(f"lookup_schema() returned: {registered_schema.schema.references}")


if __name__ == "__main__":
    register_schemas()
    apis()

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): confluent-kafka-python = ('1.8.2', 17302016), librdkafka = ('1.8.2', 17302271)
  • [x] Apache Kafka broker version: 6.1.0
  • [ ] Client configuration: {...}
  • [x] Operating system: MacOS
  • [ ] Provide client logs (with 'debug': '..' as necessary)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

RickTalken avatar Mar 18 '22 20:03 RickTalken

Thanks for reporting this and appreciate for the PR, we will review it. But please note that the schema references is not supported yet.

jliunyu avatar Mar 22 '22 19:03 jliunyu

combining with #1302

mhowlett avatar Oct 31 '22 20:10 mhowlett