confluent-kafka-python icon indicating copy to clipboard operation
confluent-kafka-python copied to clipboard

Does Confluent kafka python lib support different registry?

Open Akhilj786 opened this issue 4 years ago • 1 comments

Description

Versions: python 3.8.8

Does Confluent kafka python library support different registry? We plan to leverage the following registry https://www.apicur.io/registry/docs/apicurio-registry/2.0.0.Final/index.html https://developers.redhat.com/blog/2019/12/17/replacing-confluent-schema-registry-with-red-hat-integration-service-registry/

How to reproduce

================

Sample code:


class ProtoBufAIOProducer:
    def __init__(self, configs,value_ser=None, loop=None):
        self._loop = loop or asyncio.get_event_loop()
        # self._producer = AvroProducer(configs, default_key_schema=schema_key, default_value_schema=schema_value)
        schema_registry_client = SchemaRegistryClient(
            {'url': configs['schema.registry.url'], 'ssl.ca.location': configs['ssl.ca.location'],
             })
        protobuf_serializer = ProtobufSerializer(value_ser, schema_registry_client)

        self.producer_conf = {'bootstrap.servers': configs['bootstrap.servers'],
                              'key.serializer': StringSerializer('utf_8'),
                              'value.serializer': protobuf_serializer
                              }

        self._producer = SerializingProducer(self.producer_conf)
        self._cancelled = False
        self._poll_thread = Thread(target=self._poll_loop)
        self._poll_thread.start()

    def _poll_loop(self):
        while not self._cancelled:
            self._producer.poll(0.1)

    def close(self):
        self._cancelled = True
        self._poll_thread.join()

    def produce(self, topic, key, value, on_delivery):
        """
        A produce method in which delivery notifications are made available
        via both the returned future and on_delivery callback (if specified).
        """
        result = self._loop.create_future()

        def ack(err, msg):
            if err:
                self._loop.call_soon_threadsafe(
                    result.set_exception, KafkaException(err))
            else:
                self._loop.call_soon_threadsafe(
                    result.set_result, msg)
            if on_delivery:
                self._loop.call_soon_threadsafe(
                    on_delivery, err, msg)

        self._producer.produce(topic=topic, key=key, value=value, on_delivery=ack)
        self._producer.flush()
        return result

Checklist

Please provide the following information:

  • [x] confluent-kafka-python and librdkafka version (confluent_kafka.version() and confluent_kafka.libversion()):1.6.1
  • [x] Apache Kafka broker version: 2.7.0 (Commit:448719dc99a19793)
  • [x] Client configuration:
bootstrap.servers=localhost:9092
schema.registry.url=http://localhost:8081/api/ccompat
cdc.input.topic=pbf-meal
group.id=pbf-meal 
  • [x] Operating system:macos big sur
  • [x] Provide client logs (with 'debug': '..' as necessary)
Traceback (most recent call last):
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/serializing_producer.py", line 172, in produce
   value = self._value_serializer(value, ctx)
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/protobuf.py", line 320, in __call__
   self._schema_id = self._registry.register_schema(subject,
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 336, in register_schema
   response = self._rest_client.post(
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 127, in post
   return self.send_request(url, method='POST', body=body)
 File "/opt/anaconda3/envs/fast_ksql_ccloud/lib/python3.8/site-packages/confluent_kafka/schema_registry/schema_registry_client.py", line 179, in send_request
   raise SchemaRegistryError(response.status_code,
confluent_kafka.schema_registry.error.SchemaRegistryError: Unknown Schema Registry Error: b'Unrecognized field "references" (class io.apicurio.registry.ccompat.dto.SchemaInfo), not marked as ignorable' (HTTP status code 400, SR code -1)
  • [ ] Provide broker log excerpts
  • [ ] Critical issue

Akhilj786 avatar May 11 '21 21:05 Akhilj786

Hi @Akhilj786, thanks for asking. We don't support schema registry reference support yet, but there is already a PR for it.

For the other registry you mentioned, we haven't looked at that yet.

jliunyu avatar Mar 24 '22 06:03 jliunyu

The question posed has been answered so closing this issue. Please raise another issue for any followup questions

nhaq-confluent avatar Mar 06 '24 23:03 nhaq-confluent