confluent-kafka-python
confluent-kafka-python copied to clipboard
Does Confluent kafka python lib support different registry?
Description
Versions: python 3.8.8
Does Confluent kafka python library support different registry? We plan to leverage the following registry https://www.apicur.io/registry/docs/apicurio-registry/2.0.0.Final/index.html https://developers.redhat.com/blog/2019/12/17/replacing-confluent-schema-registry-with-red-hat-integration-service-registry/
How to reproduce
================
Sample code:
class ProtoBufAIOProducer:
def __init__(self, configs,value_ser=None, loop=None):
self._loop = loop or asyncio.get_event_loop()
# self._producer = AvroProducer(configs, default_key_schema=schema_key, default_value_schema=schema_value)
schema_registry_client = SchemaRegistryClient(
{'url': configs['schema.registry.url'], 'ssl.ca.location': configs['ssl.ca.location'],
})
protobuf_serializer = ProtobufSerializer(value_ser, schema_registry_client)
self.producer_conf = {'bootstrap.servers': configs['bootstrap.servers'],
'key.serializer': StringSerializer('utf_8'),
'value.serializer': protobuf_serializer
}
self._producer = SerializingProducer(self.producer_conf)
self._cancelled = False
self._poll_thread = Thread(target=self._poll_loop)
self._poll_thread.start()
def _poll_loop(self):
while not self._cancelled:
self._producer.poll(0.1)
def close(self):
self._cancelled = True
self._poll_thread.join()
def produce(self, topic, key, value, on_delivery):
"""
A produce method in which delivery notifications are made available
via both the returned future and on_delivery callback (if specified).
"""
result = self._loop.create_future()
def ack(err, msg):
if err:
self._loop.call_soon_threadsafe(
result.set_exception, KafkaException(err))
else:
self._loop.call_soon_threadsafe(
result.set_result, msg)
if on_delivery:
self._loop.call_soon_threadsafe(
on_delivery, err, msg)
self._producer.produce(topic=topic, key=key, value=value, on_delivery=ack)
self._producer.flush()
return result
Checklist
Please provide the following information:
- [x] confluent-kafka-python and librdkafka version (
confluent_kafka.version()andconfluent_kafka.libversion()):1.6.1 - [x] Apache Kafka broker version: 2.7.0 (Commit:448719dc99a19793)
- [x] Client configuration:
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081/api/ccompat
cdc.input.topic=pbf-meal
group.id=pbf-meal
- [x] Operating system:macos big sur
- [x] Provide client logs (with
'debug': '..'as necessary)
Traceback (most recent call last):
File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce
value = self._value_serializer(value, ctx)
File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/protobuf.py", line 320, in __call__
self._schema_id = self._registry.register_schema(subject,
File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 336, in register_schema
response = self._rest_client.post(
File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 127, in post
return self.send_request(url, method='POST', body=body)
File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 179, in send_request
raise SchemaRegistryError(response.status_code,
confluent_kafka.schema_registry.error.SchemaRegistryError: Unknown Schema Registry Error: b'Unrecognized field "references" (class io.apicurio.registry.ccompat.dto.SchemaInfo), not marked as ignorable' (HTTP status code 400, SR code -1)
- [ ] Provide broker log excerpts
- [ ] Critical issue
Hi @Akhilj786, thanks for asking. We don't support schema registry reference support yet, but there is already a PR for it.
For the other registry you mentioned, we haven't looked at that yet.
The question posed has been answered so closing this issue. Please raise another issue for any followup questions