kop
kop copied to clipboard
[FEATURE] Implement SchemaRegistry
Is your feature request related to a problem? Please describe. When running KOP you miss the Schema Registry.
Using a third party schema registry is possible but it won't support two critical features:
- security: authentication/authorization
- multi-tenant: you cannot isolate data between your tenants
Describe the solution you'd like I would like to see in KOP support for a SchemaRegistry compatible with most common Kafka Schema Registry, especially for supporting AVRO users.
Integrating this with Pulsar Schema registry will be super helpful, but I am not sure it is possible and I am not sure if it is worth
I've tried it a long time ago. It's nearly impossible because Kafka uses a global unique schema id (integer) while Pulsar doesn't support it.
I've tried it a long time ago. Do you mean to use the Pulsar Schema registry ? I guess so.
But we can implement a REST endpoint that mimics the API and stores data in some Pulsar topic under __kafka namespace of the tenant.
I haven't looked at the API for the Registry.
I will be happy to work on this topic in the short term
You can take a look at KafkaAvroDeserializer#deserialize, there's an important step that a REST request is sent for the schema string. See RestService#getId in Confluent schema registry project:
public SchemaString getId(Map<String, String> requestProperties,
int id) throws IOException, RestClientException {
String path = String.format("/schemas/ids/%d", id);
SchemaString response = httpRequest(path, "GET", null, requestProperties,
GET_SCHEMA_BY_ID_RESPONSE_TYPE);
return response;
}
However, it provides an integer schema id while there's no way to find a schema string by an integer id in Pulsar.
We can implement such API in KOP as a separate endpoint (new Channel lnitializer).
For multitenancy and security we can leverage the username as we are doing now. The username is the name of the tenant
Oh, you're right. In this case, Kafka client should configure our provided serializer/deserializer.
The background of my previous task for schema support is to reuse Confluent's provided serializer/deserializer, so it's a little different.
Kafka client should configure our provided serializer/deserializer
Why ?
isn't the Confluent SerDe supporting Authentication ?
Then we can say to the users that the must pass the tenant name as username, as they do with the username/password auth.
then we can map the requests correctly to a topic inside the __kafka namespace for the tenant
if there is no auth then we go to the default tenant