kop icon indicating copy to clipboard operation
kop copied to clipboard

[FEATURE] Implement SchemaRegistry

Open eolivelli opened this issue 4 years ago • 6 comments

Is your feature request related to a problem? Please describe. When running KOP you miss the Schema Registry.

Using a third party schema registry is possible but it won't support two critical features:

  • security: authentication/authorization
  • multi-tenant: you cannot isolate data between your tenants

Describe the solution you'd like I would like to see in KOP support for a SchemaRegistry compatible with most common Kafka Schema Registry, especially for supporting AVRO users.

Integrating this with Pulsar Schema registry will be super helpful, but I am not sure it is possible and I am not sure if it is worth

eolivelli avatar Sep 30 '21 06:09 eolivelli

I've tried it a long time ago. It's nearly impossible because Kafka uses a global unique schema id (integer) while Pulsar doesn't support it.

BewareMyPower avatar Sep 30 '21 07:09 BewareMyPower

I've tried it a long time ago. Do you mean to use the Pulsar Schema registry ? I guess so.

But we can implement a REST endpoint that mimics the API and stores data in some Pulsar topic under __kafka namespace of the tenant. I haven't looked at the API for the Registry.

I will be happy to work on this topic in the short term

eolivelli avatar Sep 30 '21 08:09 eolivelli

You can take a look at KafkaAvroDeserializer#deserialize, there's an important step that a REST request is sent for the schema string. See RestService#getId in Confluent schema registry project:

  public SchemaString getId(Map<String, String> requestProperties,
                            int id) throws IOException, RestClientException {
    String path = String.format("/schemas/ids/%d", id);

    SchemaString response = httpRequest(path, "GET", null, requestProperties,
                                        GET_SCHEMA_BY_ID_RESPONSE_TYPE);
    return response;
  }

However, it provides an integer schema id while there's no way to find a schema string by an integer id in Pulsar.

BewareMyPower avatar Sep 30 '21 09:09 BewareMyPower

We can implement such API in KOP as a separate endpoint (new Channel lnitializer).

For multitenancy and security we can leverage the username as we are doing now. The username is the name of the tenant

eolivelli avatar Sep 30 '21 09:09 eolivelli

Oh, you're right. In this case, Kafka client should configure our provided serializer/deserializer.

The background of my previous task for schema support is to reuse Confluent's provided serializer/deserializer, so it's a little different.

BewareMyPower avatar Sep 30 '21 09:09 BewareMyPower

Kafka client should configure our provided serializer/deserializer

Why ? isn't the Confluent SerDe supporting Authentication ? Then we can say to the users that the must pass the tenant name as username, as they do with the username/password auth.

then we can map the requests correctly to a topic inside the __kafka namespace for the tenant

if there is no auth then we go to the default tenant

eolivelli avatar Oct 01 '21 13:10 eolivelli