confluent-kafka-python
confluent-kafka-python copied to clipboard
AvroConsumer — access to key & value schemas
Description
AvroConsumer happily returns decoded Avro messages, but drops all reference to the original Avro schema/identifier. The schema is super-useful in some cases (eg. Debezium, where it contains the source database table schema).
One (relatively straightforward) solution is to:
- update
avro.MessageSerializer.decode_message()to return a(schema_id, payload)tuple - have
AvroConsumer.poll()wrap theMessagein a PythonAvroMessagesubclass which has additional.key_schema_idand.value_schema_idattributes. Could also do this in C. - expose the schema registry client via a
AvroConsumer.get_schema(schema_id)method
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()): 0.11.0 - [ ] Apache Kafka broker version:
- [ ] Client configuration:
{...} - [ ] Operating system:
- [ ] Provide client logs (with
'debug': '..'as necessary) - [ ] Provide broker log excerpts
- [ ] Critical issue
That makes sense.
Since 1 and 2 would be breaking changes for existing applications I guess we are stuck with 3 for the time being.
Would you care to provide a PR?
Was all one proposed solution, not three options... (1) is the only API change, I assumed it was internal, but could always add a different method to MessageSerializer — (2) is backwards compatible?
Which bits are considered internal vs published APIs?
Sorry, misinterpreted your suggestion.
AvroConsumer is the only public interface to worry about here; changing poll() to return an AvroMessage may be a breaking change for existing applications.
What if we start out with your suggested implementation, but prior to the next release decide to break poll() or add a second method for polling AvroMessages?
Would you care to submit a PR for this?
I'm looking to find a way to get the schema (or the schema id) from doing AvroConsumer(); Is there a way to do this without changing the source? Tks
is there any update on this?