schema-registry icon indicating copy to clipboard operation
schema-registry copied to clipboard

Key-Value Pair Constraints with Composite Keys

Open slominskir opened this issue 3 years ago • 7 comments

It would be useful to support constraints on key-value pairs with composite keys. This is important since we can define multiple types in the same topic. For example, using the default TopicNameStrategy subject-topic constraint with AVRO, a composite key can be defined like so:

athlete-exercise-key.avsc

{
    "type": "record",
    "name": "AthleteExerciseKey",
    "namespace": "org.test",
    "fields": [
        {
            "name": "AthleteName",
            "type": "string"
        },
        {
            "name": "type",
            "type": {
                "type": "enum",
                "name": "ExerciseType",
                "symbols": [
                    "Cardio",
                    "Weights"
                ]
            }
        }
    ]
}

athlete-exercise-value.avsc

{
    "type": "record",
    "name": "AthleteExerciseValue",
    "namespace": "org.test",
    "fields": [
        {
            "name": "union",
            "type": [
                {
                    "type": "record",
                    "name": "CardioExerciseType",
                    "fields": [ ]
                },
                {
                    "type": "record",
                    "name": "WeightsExerciseType",
                    "fields": [ ]
                }
            ]
        }
    ]
}

In this example, it would be possible to produce a message with a mismatched key-value pair - for example a key indicating "Cardio", but a value indicating "Weights". If separate topics were used the schema-message constraint would prevent the mismatch.

slominskir avatar Apr 19 '21 13:04 slominskir

Note: this enhancement may apply mainly to topics with compaction enabled as compaction functions on the message key. If using the delete cleanup policy simply dropping the type from the key would probably be reasonable. My understanding is partitioning is determined by the producer and does not require use of the key so would not necessarily be a factor.

Also note that the union in the value subject schema example above is modeled as a nested field of a record, though perhaps the union should be at the root (using schema references). Since unions are encoded with a type indicator, they actually duplicate the meaning of the type field of the composite key.

slominskir avatar Apr 19 '21 15:04 slominskir

This seems like very much application specific business logic... For the given example, why should the serializer be responsible for inspecting and ensuring that the enum string in the key matches the record type in the value?

OneCricketeer avatar Apr 24 '21 12:04 OneCricketeer

Given that the Schema Registry is already responsible or participates in enforcing many database-like constraints for Kafka it seems reasonable to support composite key constraints, which are not an application specific thing - a composite key is a general database concept. In order to have messages of different types shared on the same topic, and compacted such that only the most recent of each type is kept you currently must use a composite key that contains a field that corresponds to the union type. The fact that an AVRO union serializes an identifier "key" (even if the union is in the message value) and Kafka compaction expects keys are in the Kafka message key are implementation details of Kafka and AVRO, not of some specific application.

It is pretty clear the Kafka community supports the idea of Kafka as a database with Kafka Streams KTable and KSQL and since the Schema Registry supports multiple types in the same topic composite key constraints seem like the next step.

slominskir avatar Apr 26 '21 16:04 slominskir

Kafka is not the database, though, RocksDB is.

Regardless, my response is to your first comment, not the key itself

it would be possible to produce a message with a mismatched key-value pair - for example a key indicating "Cardio", but a value indicating "Weights

Yes, it would be possible, and you can prevent that from happening with producer or serializer logic, as I dont think the registry would be the one responsible for decoding and inspecting the payloads for application specific validation rules.

OneCricketeer avatar Apr 26 '21 20:04 OneCricketeer

Kafka is not the database, though, RocksDB is.

Kafka is the database - it is the persistent permanent store and the source of truth when using cleanup policy compact [1], [2], [3]. RocksDB is the default cache provider used by Kafka Streams KTable (potentially large data caching in memory simply isn't scalable) .

Regardless, my response is to your first comment, not the key itself

My example is just that - an example to aid in understanding. I'm not actually creating a fitness app, I just totally made that up (I'm actually working on an industrial alarm system), and a composite key constraint should apply generally.

It is true you can handle this constraint in your application, but is also true you can handle everything in your application and/or in build/install tools and cut out the schema registry completely. In fact, there is a strong argument to be made that since many applications need to know the schema a-priori to create classes the more appropriate place for validation and constraints on a schema should be done at compile/install time and schemas should be tied to a versioned artifact (library that contains the schema and classes created for them). The "registry" in this case is the artifact repos (Maven Central, PyPi, etc.). In practice it seems you often end up with both. I believe the schema registry purpose is to store/enforce/manage constraints on data (at runtime) - and that is what this enhancement is about.

slominskir avatar Apr 27 '21 16:04 slominskir

Related: https://github.com/confluentinc/ksql/issues/5091.

slominskir avatar Aug 16 '21 15:08 slominskir

One workaround here could be to use two topics and Kafka Streams

topic 1 (key = null, value = compound which includes AthleteName, type, AthleteExerciseValue)

^ This topic's value schema can make use of union so that one is forced to supply CardioExerciseType if type = Cardio, etc.

and then use Kafka Streams to pipe that into:

topic 2 (key = uses key schema from this issue's description, value = uses value schema from this issue's description)

Providing writes to topic 2 occur only via Kafka Streams (ie. provided all the data flows in via topic 1), you'd wind up with the key/value constraints you want on the data.

Note: I say Kafka Streams and not KSQL as, last time I checked, KSQL doesn't play nice with union types.

benissimo avatar Sep 16 '21 14:09 benissimo