confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Bug when Serializing/Deserializing JSON Schemas as Draf-07 using JSONSerializer/JSONDeserializer

Open masosky opened this issue 2 years ago • 3 comments

Description

Classes:

  • JSONSerializer
  • JSONDeserializer Contain a short code where they are validating the information of the schema related to the instance using "jsonschema" package.
from jsonschema import validate
           
 try:
    validate(instance=obj_dict, schema=self._parsed_schema)
except ValidationError as ve:
    raise SerializationError(ve.message)

This is not 100% correct if you are using JSON Schemas that for instance are version "draft-07" You must use another another validator. See documentation here: https://python-jsonschema.readthedocs.io/en/stable/validate/#versioned-validators

For instance, if you are using JSON Draft-7 Schema you must use:

from jsonschema import Draft7Validator

schema = {
    "$schema": "http://json-schema.org/draft-07/schema#",

    "type": "object",
    "properties": {
        "name": {"type": "string"},
        "email": {"type": "string"},
    },
    "required": ["email"]
}
Draft7Validator.check_schema(schema)

or you can use the "validate" but using the following property: "format"

validate(
    instance="-12",
    schema={"format": "ipv4"},
    format_checker=draft7_format_checker)

How to reproduce

You must try to produce data to Kafka using the JSONSerializer for the value (You must have Schema Registry) For instance, if you are using topic "test", you must have the schema registered as "topic-value" You can use this schema for instance:

{
  "$schema": "http://json-schema.org/draft-07/schema#",
  "definitions": {
    "quality_type": {
      "type": "string",
      "enum": [
        "RTSP_SD",
        "RTSP_HD",
        "HLS_4K",
        "ABR"
      ]
    }
  },
  "$id": "urn://Product.schema.json",
  "title": "Product",
  "version" : "0.0.1",
  "description": "This event represents a Product",
  "type": "object",
  "properties": {
    "enviID": {
      "type": "string",
      "minLength": 10
    },
    "javaType": {
      "type": "string"
    },
    "sSystem": {
      "type": "string",
      "enum": [
        "N",
        "C"
      ]
    },
    "pID": {
      "type": "string",
      "minLength": 5,
      "maxLength": 32
    },
    "ipID": {
      "type": "integer",
      "Minimum": 0
    },
    "cat": {
      "type": "string",
      "minLength": 1,
      "maxLength": 64
    },
    "cID": {
      "type": "string",
      "minLength": 0,
      "maxLength": 32
    },
    "tdt": {
      "type": "array",
      "items": {
        "type": "string",
        "minLength": 1,
        "maxLength": 64
      },
      "minItems": 1,
      "uniqueItems": true
    },
    "tgs": {
      "type": "array",
      "items": {
        "type": "string",
        "minLength": 1,
        "maxLength": 64
      },
      "minItems": 1,
      "uniqueItems": true
    },
    "imq": {
      "type": "array",
      "items": {
        "$ref": "#/definitions/quality_type"
      },
      "minItems": 1,
      "uniqueItems": true
    },
    "comingUpPublishDate": {
      "type": "string",
      "format": "date-time"
    },
    "csd": {
      "type": "string",
      "format": "date-time"
    },
    "ced": {
      "type": "string",
      "format": "date-time"
    },
    "deletionDate": {
      "type": "string",
      "format": "date-time"
    }
  },
  "required": [
    "enviID",
    "sSystem",
    "pID",
    "cat",
    "cID",
    "csd",
    "ced"
  ],
  "additionalProperties": false,
  "allOf": [
    {
      "anyOf": [
        {
          "not": {
            "properties": {
              "sSystem": {
                "const": "C"
              }
            }
          }
        },
        {
          "required": [
            "ipID",
            "tgs"
          ]
        }
      ]
    }
  ]
}

Checklist

Please provide the following information:

  • [ ] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()): 1.7.0 "pip install confluent-kafka"
  • [ ] Apache Kafka broker version: confluentinc/cp-kafka:6.0.4 confluentinc/cp-zookeeper:5.5.0 confluentinc/cp-schema-registry:6.1.1
  • [ ] Client configuration: {...} --bootstrap_servers "localhost:9094" Schema Registry conf: schema_registry_conf: dict = {"auto.register.schemas": False}
  • [ ] Operating system: MacOS Big Sur 11.6
  • [ ] Provide client logs (with 'debug': '..' as necessary)
Error when producing.Traceback (most recent call last):
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 794, in resolve_from_url
   document = self.store[url]
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/_utils.py", line 22, in __getitem__
   return self.store[self.normalize(uri)]
KeyError: ''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 797, in resolve_from_url
   document = self.resolve_remote(url)
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 896, in resolve_remote
   with urlopen(uri) as url:
 File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 214, in urlopen
   return opener.open(url, data, timeout)
 File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 501, in open
   req = Request(fullurl, data)
 File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 320, in __init__
   self.full_url = url
 File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 346, in full_url
   self._parse()
 File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 375, in _parse
   raise ValueError("unknown url type: %r" % self.full_url)
ValueError: unknown url type: ''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce
   value = self._value_serializer(value, ctx)
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/confluent_kafka/schema_registry/json_schema.py", line 205, in __call__
   validate(instance=value, schema=self._parsed_schema)
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 965, in validate
   error = exceptions.best_match(validator.iter_errors(instance))
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/exceptions.py", line 354, in best_match
   best = next(errors, None)
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 224, in iter_errors
   for error in errors:
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/_validators.py", line 329, in properties
   yield from validator.descend(
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 240, in descend
   for error in self.evolve(schema=schema).iter_errors(instance):
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 224, in iter_errors
   for error in errors:
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/_legacy_validators.py", line 113, in items_draft6_draft7_draft201909
   yield from validator.descend(item, items, path=index)
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 240, in descend
   for error in self.evolve(schema=schema).iter_errors(instance):
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 224, in iter_errors
   for error in errors:
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/_validators.py", line 291, in ref
   scope, resolved = validator.resolver.resolve(ref)
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 786, in resolve
   return url, self._remote_cache(url)
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 799, in resolve_from_url
   raise exceptions.RefResolutionError(exc)
jsonschema.exceptions.RefResolutionError: unknown url type: ''

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/kafka-generator.py", line 190, in <module>
   random_generate()
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/kafka-generator.py", line 42, in random_generate
   produce_records(debug, dict_list, args.json_schema_file)
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/kafka-generator.py", line 113, in produce_records
   produce_json_confluent_serializer(args.topic, dict_list, args.debug, args.chunk_size)
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/producer.py", line 57, in produce_json_confluent_serializer
   raise e
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/producer.py", line 43, in produce_json_confluent_serializer
   serializing_producer.produce(topic=topic, key=None, value=raw_dict,
 File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/confluent_kafka/serializing_producer.py", line 174, in produce
   raise ValueSerializationError(se)
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="unknown url type: ''"}

  • [ ] Provide broker log excerpts
  • [ ] Critical issue

masosky avatar Oct 06 '21 11:10 masosky

Hello?

masosky avatar Dec 30 '21 14:12 masosky

Hi @masosky, thanks for asking.

From the logs, the error is url related: confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="unknown url type: ''"}

Please provide the right URL.

jliunyu avatar Mar 23 '22 22:03 jliunyu

I have faced the same issue and as per my understanding I found out that I was not sending the type of schema format we are passing to the produced. To fix this issue we have to pass schema type.

schema_str = """ "schema":{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "User", "description": "A Confluent Kafka Python User", "type": "object", "properties": { "name": { "description": "User's name", "type": "string" }, "favorite_number": { "description": "User's favorite number", "type": "number", "exclusiveMinimum": 0 }, "favorite_color": { "description": "User's favorite color", "type": "string" } } } // we have to add this curly bracket at the end of properties section to end "schema" , "schemaType": "JSON", "required": [ "name", "favorite_number", "favorite_color" ] } }

abhimanyukumar0007 avatar Jul 12 '22 11:07 abhimanyukumar0007

There seems to be no issue from the client side on this. Closing this ticket. Feel free to open it again if required.

pranavrth avatar Feb 27 '24 13:02 pranavrth