confluent-kafka-python
confluent-kafka-python copied to clipboard
Bug when Serializing/Deserializing JSON Schemas as Draf-07 using JSONSerializer/JSONDeserializer
Description
Classes:
- JSONSerializer
- JSONDeserializer Contain a short code where they are validating the information of the schema related to the instance using "jsonschema" package.
from jsonschema import validate
try:
validate(instance=obj_dict, schema=self._parsed_schema)
except ValidationError as ve:
raise SerializationError(ve.message)
This is not 100% correct if you are using JSON Schemas that for instance are version "draft-07" You must use another another validator. See documentation here: https://python-jsonschema.readthedocs.io/en/stable/validate/#versioned-validators
For instance, if you are using JSON Draft-7 Schema you must use:
from jsonschema import Draft7Validator
schema = {
"$schema": "http://json-schema.org/draft-07/schema#",
"type": "object",
"properties": {
"name": {"type": "string"},
"email": {"type": "string"},
},
"required": ["email"]
}
Draft7Validator.check_schema(schema)
or you can use the "validate" but using the following property: "format"
validate(
instance="-12",
schema={"format": "ipv4"},
format_checker=draft7_format_checker)
How to reproduce
You must try to produce data to Kafka using the JSONSerializer for the value (You must have Schema Registry) For instance, if you are using topic "test", you must have the schema registered as "topic-value" You can use this schema for instance:
{
"$schema": "http://json-schema.org/draft-07/schema#",
"definitions": {
"quality_type": {
"type": "string",
"enum": [
"RTSP_SD",
"RTSP_HD",
"HLS_4K",
"ABR"
]
}
},
"$id": "urn://Product.schema.json",
"title": "Product",
"version" : "0.0.1",
"description": "This event represents a Product",
"type": "object",
"properties": {
"enviID": {
"type": "string",
"minLength": 10
},
"javaType": {
"type": "string"
},
"sSystem": {
"type": "string",
"enum": [
"N",
"C"
]
},
"pID": {
"type": "string",
"minLength": 5,
"maxLength": 32
},
"ipID": {
"type": "integer",
"Minimum": 0
},
"cat": {
"type": "string",
"minLength": 1,
"maxLength": 64
},
"cID": {
"type": "string",
"minLength": 0,
"maxLength": 32
},
"tdt": {
"type": "array",
"items": {
"type": "string",
"minLength": 1,
"maxLength": 64
},
"minItems": 1,
"uniqueItems": true
},
"tgs": {
"type": "array",
"items": {
"type": "string",
"minLength": 1,
"maxLength": 64
},
"minItems": 1,
"uniqueItems": true
},
"imq": {
"type": "array",
"items": {
"$ref": "#/definitions/quality_type"
},
"minItems": 1,
"uniqueItems": true
},
"comingUpPublishDate": {
"type": "string",
"format": "date-time"
},
"csd": {
"type": "string",
"format": "date-time"
},
"ced": {
"type": "string",
"format": "date-time"
},
"deletionDate": {
"type": "string",
"format": "date-time"
}
},
"required": [
"enviID",
"sSystem",
"pID",
"cat",
"cID",
"csd",
"ced"
],
"additionalProperties": false,
"allOf": [
{
"anyOf": [
{
"not": {
"properties": {
"sSystem": {
"const": "C"
}
}
}
},
{
"required": [
"ipID",
"tgs"
]
}
]
}
]
}
Checklist
Please provide the following information:
- [ ] confluent-kafka-python and librdkafka version (
confluent_kafka.version()
andconfluent_kafka.libversion()
): 1.7.0 "pip install confluent-kafka" - [ ] Apache Kafka broker version: confluentinc/cp-kafka:6.0.4 confluentinc/cp-zookeeper:5.5.0 confluentinc/cp-schema-registry:6.1.1
- [ ] Client configuration:
{...}
--bootstrap_servers "localhost:9094" Schema Registry conf:schema_registry_conf: dict = {"auto.register.schemas": False}
- [ ] Operating system: MacOS Big Sur 11.6
- [ ] Provide client logs (with
'debug': '..'
as necessary)
Error when producing.Traceback (most recent call last):
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 794, in resolve_from_url
document = self.store[url]
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/_utils.py", line 22, in __getitem__
return self.store[self.normalize(uri)]
KeyError: ''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 797, in resolve_from_url
document = self.resolve_remote(url)
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 896, in resolve_remote
with urlopen(uri) as url:
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 214, in urlopen
return opener.open(url, data, timeout)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 501, in open
req = Request(fullurl, data)
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 320, in __init__
self.full_url = url
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 346, in full_url
self._parse()
File "/Library/Frameworks/Python.framework/Versions/3.9/lib/python3.9/urllib/request.py", line 375, in _parse
raise ValueError("unknown url type: %r" % self.full_url)
ValueError: unknown url type: ''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce
value = self._value_serializer(value, ctx)
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/confluent_kafka/schema_registry/json_schema.py", line 205, in __call__
validate(instance=value, schema=self._parsed_schema)
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 965, in validate
error = exceptions.best_match(validator.iter_errors(instance))
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/exceptions.py", line 354, in best_match
best = next(errors, None)
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 224, in iter_errors
for error in errors:
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/_validators.py", line 329, in properties
yield from validator.descend(
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 240, in descend
for error in self.evolve(schema=schema).iter_errors(instance):
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 224, in iter_errors
for error in errors:
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/_legacy_validators.py", line 113, in items_draft6_draft7_draft201909
yield from validator.descend(item, items, path=index)
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 240, in descend
for error in self.evolve(schema=schema).iter_errors(instance):
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 224, in iter_errors
for error in errors:
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/_validators.py", line 291, in ref
scope, resolved = validator.resolver.resolve(ref)
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 786, in resolve
return url, self._remote_cache(url)
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/jsonschema/validators.py", line 799, in resolve_from_url
raise exceptions.RefResolutionError(exc)
jsonschema.exceptions.RefResolutionError: unknown url type: ''
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/kafka-generator.py", line 190, in <module>
random_generate()
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/kafka-generator.py", line 42, in random_generate
produce_records(debug, dict_list, args.json_schema_file)
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/kafka-generator.py", line 113, in produce_records
produce_json_confluent_serializer(args.topic, dict_list, args.debug, args.chunk_size)
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/producer.py", line 57, in produce_json_confluent_serializer
raise e
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/src/producer.py", line 43, in produce_json_confluent_serializer
serializing_producer.produce(topic=topic, key=None, value=raw_dict,
File "/Users/xaviermas/repositories/gvp-datahub-mock-kakfa-generator/venv/lib/python3.9/site-packages/confluent_kafka/serializing_producer.py", line 174, in produce
raise ValueSerializationError(se)
confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="unknown url type: ''"}
- [ ] Provide broker log excerpts
- [ ] Critical issue
Hello?
Hi @masosky, thanks for asking.
From the logs, the error is url related: confluent_kafka.error.ValueSerializationError: KafkaError{code=_VALUE_SERIALIZATION,val=-161,str="unknown url type: ''"}
Please provide the right URL.
I have faced the same issue and as per my understanding I found out that I was not sending the type of schema format we are passing to the produced. To fix this issue we have to pass schema type.
schema_str = """ "schema":{ "$schema": "http://json-schema.org/draft-07/schema#", "title": "User", "description": "A Confluent Kafka Python User", "type": "object", "properties": { "name": { "description": "User's name", "type": "string" }, "favorite_number": { "description": "User's favorite number", "type": "number", "exclusiveMinimum": 0 }, "favorite_color": { "description": "User's favorite color", "type": "string" } } } // we have to add this curly bracket at the end of properties section to end "schema" , "schemaType": "JSON", "required": [ "name", "favorite_number", "favorite_color" ] } }
There seems to be no issue from the client side on this. Closing this ticket. Feel free to open it again if required.