confluent-kafka-python
confluent-kafka-python copied to clipboard
AvroDeserializer reader schema cannot ignore values required in writer schema
Description
I'm using AvroDeserializer to deserialize records from a topic and I'm running into issues when confluent-kafka-python calls fastavro in the deserialization function:
obj_dict = schemaless_reader(payload,
writer_schema,
self._reader_schema)
(link to the source code line)
The reader_schema that I pass in omits a field that is present in the writer_schema because I'm not interested in reading it. However, this results in fastavro raising the following exception:
File "fastavro/_read.pyx", line 835, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 846, in fastavro._read.schemaless_reader
File "fastavro/_read.pyx", line 561, in fastavro._read._read_data
File "fastavro/_read.pyx", line 472, in fastavro._read.read_record
File "fastavro/_read.pyx", line 559, in fastavro._read._read_data
File "fastavro/_read.pyx", line 413, in fastavro._read.read_union
File "fastavro/_read.pyx", line 73, in fastavro._read.match_types
File "fastavro/_read.pyx", line 127, in fastavro._read.match_schemas
fastavro._read_common.SchemaResolutionError: Schema mismatch: {'avro.java.string': 'String', 'type': 'string'} is not null
I suppose this makes sense from fastavro point-of-view because the purpose of the schemaless_reader reader_schema is for "schema migration" (see here) and a required field cannot just be dropped. From a Kafka topic reader's point of view, it doesn't make sense as a reader might only be interested in a subset of data.
Personally, my issue would be solved by allowing AvroDeserializer to allow a None value for the reader_schema and in that case ignoring reader_schema completely by calling fastavro like this:
obj_dict = schemaless_reader(payload,
writer_schema,
None)
Or do you feel this should be addressed with fastavro?
How to reproduce
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): latest (1.4.2) - [x] Apache Kafka broker version: Confluent Cloud
- [ ] Client configuration:
{...} - [x] Operating system: Ubuntu 18.04 (under WSL1)
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue