kop icon indicating copy to clipboard operation
kop copied to clipboard

SNIP 42: Kafka Avro Serializer and Deserializer

Open BewareMyPower opened this issue 3 years ago • 27 comments

Table of Contents

  • Motivation
  • Approach
    • API Design
    • Schema Registry Client
    • Schema Version Header
      • Serializer
      • Deserializer
    • Interaction with Pulsar clients
      • Kafka Producer to Pulsar Consumer
      • Pulsar Producer to Kafka Consumer
    • Overhead Analysis
  • Documentation Changes
  • Test Plan

Motivation

KoP supports producing and consuming via Kafka clients. The Kafka API is different from Pulsar API. Let's focus on the producer first. Using a Kafka producer to produce messages requires the configuration of the key serializer and value serializer.

final Properties props = new Properties();
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
final KafkaProducer<String, String> producer = new KafkaProducer<>(props);
// Here we can just use ProducerRecord<>, I added the generic types here
// to indicate it accepts two generic parameters.
producer.send(new ProducerRecord<String, String>("my-topic", "key", "value")).get();

For the code example above, two StringSerializer objects will be created inside the KafkaProducer and they are responsible to serialize the key and value into bytes. See the serialize method in the Serializer interface.

public interface Serializer<T> extends Closeable {

    byte[] serialize(String topic, T data);

and the StringSerializer implements the Serializer<String> so that the String value in ProducerRecord could be serialized to bytes.

Similarly, the Kafka consumer requires a Deserializer implementation to decode the bytes into the generic type.

public interface Deserializer<T> extends Closeable {

    T deserialize(String topic, byte[] data);

As we can see, Kafka already provides some default implementations for serializers and deserializers for the primitive types. But for generic types like Object or Avro's GenericRecord/SpecificRecord, Kafka users have to use Confluent's Avro serializer and deserializer (let's say SerDes in short). The Confluent's Avro SerDes leverages a default method in both interfaces,

    default void configure(Map<String, ?> configs, boolean isKey) {
        // intentionally left blank
    }

which allow users to configure the URL of the Schema Registry that is also provided by Confluent. Then the SerDes is responsible to register or get schema on the Confluent Schema Registry.

To support users who have the demand for Avro Schema, there are two ways:

  1. Implement the same REST APIs of Confluent Schema Registry.
  2. Implement our own Avro SerDes.

We have discussed about these two solutions before. As a summary, here is the comparison from my perspective.

For the 1st solution:

  • PROs
    • No changes needed for Kafka clients of all languages that support communication with Confluent Schema Registry.
  • CONs
    • We must use the Confluent REST API to manage these schemas. It means we need to implement or reuse the admin tools and learn these admin APIs.
    • Confluent has implements the schema that has a unique integer ID and could be shared by multiple topics. It's impossible to be mapped to Pulsar schema. So we need to save the additional Confluent Schema related metadata somewhere (like system topics or ZooKeeper).

For the 2nd solution:

  • PROs
    • We can reuse the Pulsar schema.
    • It could be possible to access the same topic among Pulsar clients and Kafka clients.
    • It's much more easier than the 1st solution.
  • CONs
    • We need to develop the SerDes for Kafka clients of all languages.
    • For those applications that already use Confluent's Avro SerDes, we need to change the dependency to our own SerDes.

This proposal focus on the 2nd solution and the SerDes for Java clients.

Approach

API Design

Just use Object as the generic parameter to be consistent with the Confluent Avro SerDes.

package io.streamnative.kafka.serializers;

public class KafkaAvroSerializer implements Serializer<Object> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        /* ... */
    }

    @Override
    public byte[] serialize(String topic, Object value) {
        /* ... */
    }
}

package io.streamnative.kafka.serializers;

public class KafkaAvroDeserializer implements Deserializer<Object> {

    @Override
    public void configure(Map<String, ?> configs, boolean isKey) {
        /* ... */
    }

    @Override
    public Object deserialize(String topic, byte[] bytes) {
        /* ... */
    }
}

Schema Registry Client

We need to register schema when serializing and get schema when deserializing. We can use Pulsar's REST API for it. But for simplicity, we can leverage the Schemas class from the pulsar-client-admin dependency for the initial implementation.

Then a SchemaRegistryClient is responsible for these admin operations.

Register schema (in serializer):

// It should be noted that the side effect of this method is counter-intuitive.
// 1. It tries to update the schema if the schema exists.
// 2. No exception would be thrown if it failed.
// 3. It doesn't return a schema version.
void createSchema(String topic, SchemaInfo schemaInfo) throws PulsarAdminException;

// Get the schema version after `createSchema` is called.
// NOTE: We can also compare the schema JSON to see if the `createSchema` has
//  updated the schema. However, to be consistent with Pulsar client's behavior,
//  even if the producer's schema is incompatible with the topic's schema, the
//  creation won't fail and messages of older schemas can be sent.
SchemaInfoWithVersion getSchemaInfoWithVersion(String topic) throws PulsarAdminException;

Get schema (in deserializer):

// It should be noted if there's no schema associated with the version, this
// method will return null. In this case, fallback to the other overload version
// to get latest schema.
SchemaInfo getSchemaInfo(String topic, long version) throws PulsarAdminException;

// Use this method if there is no schema version (*) or as a fallback.
// (*) See the next section for when to call this method.
SchemaInfo getSchemaInfo(String topic) throws PulsarAdminException;

However, if these operations were performed each time a message arrived, the performance would decrease sharply. We could maintain a cache in the Schema Registry Client.

// topic -> { schema -> schema version }
private final Map<String, Map<Schema, Long>> versionCache = new ConcurrentHashMap<>();
// topic -> { schema version -> schema }
private final Map<String, Map<Long, Schema>> schemaCache = new ConcurrentHashMap<>();

Schema Version Header

In Pulsar, producers set the schema version in the message metadata.

message MessageMetadata {
    /* ... */
    optional bytes schema_version = 16;
}

Then in Message#getValue, it could get the schema version and get the schema from the Broker.

However, for Kafka clients, the message metadata is added in the server side (KoP). There is no way for KoP to know the schema version from Kafka clients without additional information.

In this proposal, producer adds 10 bytes at the head of each message value.

| MARKER (2 bytes) | Schema Version for example (8 bytes)    |
| :--------------- | :-------------------------------------- |
| 0x03 0x04        | 0x00 0x00 0x00 0x00 0x00 0x00 0x00 0x00 |

The byte array [0x03 0x04] is a fixed marker to indicate the following 8 bytes represent the schema version. Then we can use Schema.INT64 to encode or decode. (To avoid introducing the dependency on pulsar-client, we can migrate the schema manually)

Serializer

  1. Parse the schema from the class of the user provided object.
  2. Register the schema via the Schema Registry Client.
  3. Serialize the Schema Version Header.
  4. Serialize the object to bytes and append after the header.

Deserializer

  1. Parse the schema version from the Schema Version Header.
  2. If the schema version exists, get the schema associated with the version via the Schema Registry Client.
  3. Otherwise, get the latest schema via the Schema Registry Client.
  4. Deserialize the bytes via the schema.

Interaction with Pulsar clients

Here we only discuss how to deal with the schema version when entryFormat is not pulsar because when entryFormat=pulsar, the message format conversion must be performed each time.

Kafka Producer to Pulsar Consumer

KafkaPayloadProcessoris a plugin configured in Pulsar client to convert the messages from Kafka format to Pulsar format (Message), see PIP 96 for details.

When the processor converts a Kafka record (MemoryRecords) to Pulsar message (Message):

  1. Deserialize the bytes and try to get the schema version from the Schema Version Header.
  2. If the schema version exists, set the schema version in message metadata.

Pulsar Producer to Kafka Consumer

When KoP reads entries from the managed cursor:

  1. Get the schema version from the message metadata.
  2. If the schema version exists, serialize it to the Schema version header and prepend to the head of each entry.

Overhead Analysis

Since KafkaPayloadProcessor only affects the performance in Pulsar Consumer, it doesn't affect the performance of KoP. It could bring some performance loss when Pulsar consumer consumes messages from Kafka producer.

However, since KafkaPayloadProcessor needs to convert each single message (in the batch) from Kafka format to Pulsar format. The overhead is only parsing the first 10 bytes of each record. Here is the current code:

for (Record record : records.records()) {
    // TODO: parse the 10 bytes at the head of record.value(), in
    //  `newByteBufFromRecord`, a MessageMetadata will be created so we can
    //  set the schema version in this method.
    final MessagePayload singlePayload = newByteBufFromRecord(record);

The overhead in KoP happens when KoP handle the FETCH request from Kafka consumer. Let's see the current workflow:

  1. Read some entries from the managed ledger.
  2. Set the offset field in each entry.
  3. Merge these entries into the buffer to client.

We can see copying bytes cannot be avoided. If the entries were sent by Pulsar producer, the only overhead is adding 10 bytes each message to copy.

Documentation Changes

  • Describe how to configure the SerDes.
  • Explain the possible configurations related to SerDes.

For producers, configure the serializer:

props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaAvroSerializer.class);

For consumers, configure the deserializer:

props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, KafkaAvroDeserializer.class);

Currently, we only aimed at the serializer for value, not key.

In addition, we must add a configuration to specify Pulsar's HTTP server URL for both serializer and deserializer.

props.put("schema.registry.url", "http://localhost:8080");

We can also add some optional configurations for SerDes:

// Whether to allow the field to be null
props.put("allow.null", true);
// Some configs specific to the implementation, like the cache size limit...

Test Plan

Add test for various Pulsar schema compatibilities (Forward/Backward/Full).

Since serialize is only called when KafkaProducer#send is called and deserialize is only called when KafkaConsumer receives a message, we must send and receive at least 1 message .

Take Forward compatibility strategy as example.

  1. Create Producer-A with Schema-1 and send at least 1 message
  2. Validate the schema is Schema-1.
  3. Create a consumer with Schema-1 and receive it.
  4. Create Producer-B with a compatible Schema-2 and send some messages.
  5. Validate the schema is Schema-2.
  6. Send messages via Producer-A. Sending messages of older schemas should be allowed
  7. Receive all these messages and validate.

It's similar for other Schema compatibility strategies, though there will be some differences more or less.

We should also test the interaction between Pulsar clients and Kafka clients.

BewareMyPower avatar May 20 '22 02:05 BewareMyPower

IIUC in this proposal we are going to provide a Kafka Serializer and a Kafka Deserializer for Kafka Clients. We have to send to the Kafka Client the id of the Schema in the Pulsar Schema Registry.

Currently if you write AVRO using the Pulsar Client and you read using the Kafka Client, on the Kafka side you receive the raw AVRO encoded message. Do you want to encode the payload in another way, like adding the id of the Pulsar Schema ? How do you configure this behaviour ? it cannot be a global Kop Configuration parameter because it probably depends on the topic. Should we use Topic Properties in order to configure the expected encoding while serving Fetches ?

eolivelli avatar May 20 '22 07:05 eolivelli

we should also support KeyValue<AVRO, AVRO> in order to fully cover the AVRO support in Kafka.

eolivelli avatar May 20 '22 07:05 eolivelli

Do you want to encode the payload in another way, like adding the id of the Pulsar Schema ?

Yes. See the Interaction with Pulsar clients - Pulsar Producer to Kafka Consumer section. Add the extra schema version header before the raw bytes from Pulsar producer. Just modify the following code

https://github.com/streamnative/kop/blob/ae29ee300612ea1df5a2efda3ef8c7916b7fc622/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/format/AbstractEntryFormatter.java#L56-L58

like

                final byte[] schemaVersion = (metadata.hasSchemaVersion() ? metadata.getSchemaVersion() : null);
                if (isKafkaEntryFormat(metadata)) {
                    if (schemaVersion != null && schemaVersion.length > 0) {
                        // NOTE: just an example
                        batchedByteBuf.writeBytes(new byte[]{ 0x03, 0x04 });
                        batchedByteBuf.writeBytes(schemaVersion);
                    }
                    byte batchMagic = byteBuf.getByte(byteBuf.readerIndex() + MAGIC_OFFSET);

Should we use Topic Properties in order to configure the expected encoding while serving Fetches ?

I have a question that in which case we need to keep the original bytes from Pulsar client?

The original behavior of KoP doesn't add the schema version before each message, i.e. the messages are served by previous KoP, the deserializer will fetch the latest schema of the topic to decode these messages. See Schema Version Header - Deserializer section.

we should also support KeyValue<AVRO, AVRO> in order to fully cover the AVRO support in Kafka.

Yeah, this proposal mainly focus on the first step, i.e. support the SerDes for value first and make sure it works. We can support key-value schemas later.

BewareMyPower avatar May 20 '22 08:05 BewareMyPower

The original behavior of KoP doesn't add the schema version before each message, i.e. the messages are served by previous KoP, the deserializer will fetch the latest schema of the topic to decode these messages. See Schema Version Header - Deserializer section.

what happens if I use the latest version of KOP with a Kafka Client that doesn't use our new SerDe ?

eolivelli avatar May 20 '22 09:05 eolivelli

with a Kafka Client that doesn't use our new SerDe ?

What deserializer do you use? If you don't use the new SerDes, you must implement your own deserializer to deserialize the message value from Pulsar producer. Otherwise, you can only use BytesSerializer and deserialize the byte[] manually.

BewareMyPower avatar May 20 '22 09:05 BewareMyPower

I think what you mentioned is a compatibility issue. But it looks like there is nothing we need to keep it compatible.

BewareMyPower avatar May 20 '22 09:05 BewareMyPower

Currently, if I produce with a Pulsar Producer a AVRO message (or KeyValue SEPARATED encoding value), and I have entryFormat=kafka and I consume with a BytesDeserializer I receive the AVRO encoded payload (without the schema) and I can decode the payload manually.

is this change supposed to change KOP in a way that we ALWAYS prepend the "magic + pulsar schema version" header in the Kafka payload received on by the Kafka Client ?

eolivelli avatar May 20 '22 09:05 eolivelli

is this change supposed to change KOP in a way that we ALWAYS prepend the "magic + pulsar schema version" header in the Kafka payload received on by the Kafka Client ?

Yes.

BewareMyPower avatar May 20 '22 09:05 BewareMyPower

is this change supposed to change KOP in a way that we ALWAYS prepend the "magic + pulsar schema version" header in the Kafka payload received on by the Kafka Client ?

@eolivelli The "magic" doesn't need to be prepend in the Kafka payload. The pulsar schema version is enough.

wenbingshen avatar May 20 '22 12:05 wenbingshen

@eolivelli The "magic" doesn't need to be prepend in the Kafka payload. The pulsar schema version is enough.

Sorry, I think your "magic" should be the meaning of "MARKER".

wenbingshen avatar May 20 '22 12:05 wenbingshen

@BewareMyPower Can we extend the header field of Reocrd to save the pulsar schema version without needing to embed this information in the payload, but this header only supports magic=2: https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/record/DefaultRecord.java#L48-L50

wenbingshen avatar May 20 '22 12:05 wenbingshen

IIUC the Header is not available to the Deserializer, but only the payload

eolivelli avatar May 20 '22 12:05 eolivelli

IIUC the Header is not available to the Deserializer, but only the payload

Yes, It seems that the Deserializer cannot access the Header in the native kafka client.

wenbingshen avatar May 20 '22 12:05 wenbingshen

https://github.com/apache/kafka/blob/trunk/clients/src/main/java/org/apache/kafka/common/serialization/Deserializer.java#L52-L61

Header can be obtained from the Deserializer interface.

wenbingshen avatar May 20 '22 12:05 wenbingshen

that's great. maybe it has been added recently.

this way we can inject the pulsarSchema version inside a header !

eolivelli avatar May 20 '22 12:05 eolivelli

that's great. maybe it has been added recently.

This new method was introduced from KIP-336 , so at least kafka-client-2.1.0 is required to support accessing Header.

wenbingshen avatar May 20 '22 13:05 wenbingshen

I believe that is good to support Kafka 2.1+ for this feature

eolivelli avatar May 20 '22 13:05 eolivelli

We need an internal discussion in StreamNative for whether to drop the Avro SerDes support for Kafka client <= 2.0.

For Kafka clients 0.11.0.0 to 2.0, we need to implement ExtendedSerializer and ExtendedDeserializer introduced from https://issues.apache.org/jira/browse/KAFKA-4208.


BTW, I found there's something wrong with my original thought.

               final byte[] schemaVersion = (metadata.hasSchemaVersion() ? metadata.getSchemaVersion() : null);
               if (isKafkaEntryFormat(metadata)) {
                   if (schemaVersion != null && schemaVersion.length > 0) {
                       // NOTE: just an example
                       batchedByteBuf.writeBytes(new byte[]{ 0x03, 0x04 });
                       batchedByteBuf.writeBytes(schemaVersion);
                   }

If isKafkaEntryFormat(metadata) is true, it means the message was sent by Kafka clients. Actually, we need to inject the schema version in decodePulsarEntryToKafkaRecords method. See https://github.com/streamnative/kop/blob/ae29ee300612ea1df5a2efda3ef8c7916b7fc622/kafka-impl/src/main/java/io/streamnative/pulsar/handlers/kop/utils/ByteBufUtils.java#L99-L102

Let's talk back to the way by injecting the schema version into the header. It will break the compatibility because after that there is always at least one header for each message produced by Pulsar clients. But encoding the schema version into the value doesn't break anything.

Let's see this issue in high level:

Serializer Deserializer Result
AVRO AVRO Success
AVRO Bytes Depends on the application side
AVRO Other Failure
Pulsar AVRO AVRO Success
Pulsar AVRO Bytes Depends on the application side
Pulsar AVRO Other Failure

What's the disadvantage of encoding the schema version into the bytes of record's value?

BewareMyPower avatar May 20 '22 16:05 BewareMyPower

We can also see Confluent's Avro serializer also adds some extra bytes before the bytes serialized from the value object.

MAGIC_BYTE (1 byte) Schema ID (4 bytes) Value (N bytes)
0x00 Serialized from the integer Serialized from the value via the Avro schema

BewareMyPower avatar May 20 '22 17:05 BewareMyPower

Let's talk back to the way by injecting the schema version into the header. It will break the compatibility because after that there is always at least one header for each message produced by Pulsar clients. But encoding the schema version into the value doesn't break anything.

Why would it break compatibility? Messages produced from pulsar producer do not need to set headers for each message. This depends on how the pulsar producer is implemented. Even if the schema version is added in front of the value, all messages in a single Entry also shared the schema version of one metadata.

like

                if (magic >= RecordBatch.MAGIC_VALUE_V2) {
                    List<KeyValue> singleMessageMetadataPropertiesList = singleMessageMetadata.getPropertiesList();
                    if (metadata.hasSchemaVersion() && metadata.getSchemaVersion().length > 0) {
                        singleMessageMetadataPropertiesList
                                .add(new KeyValue().setKey("schema.version").setValue(
                                        String.valueOf(metadata.getSchemaVersion())));
                    }
                    final Header[] headers = getHeadersFromMetadata(singleMessageMetadataPropertiesList);
                    builder.appendWithOffset(baseOffset + i,
                            timestamp,
                            getKeyByteBuffer(singleMessageMetadata),
                            value,
                            headers);
                }

wenbingshen avatar May 21 '22 05:05 wenbingshen

For Kafka clients 0.11.0.0 to 2.0, we need to implement ExtendedSerializer and ExtendedDeserializer introduced from https://issues.apache.org/jira/browse/KAFKA-4208.

It's not easy to solve header compatibility from 0.11.0 to 2.0 with ExtendedSerializer and ExtendedDeserializer alone.

We can also see Confluent's Avro serializer also adds some extra bytes before the bytes serialized from the value object. I'm guessing this might be a workaround for Confluent's Avro serializer for multi-version compatibility.

Maybe the questions we need to think about are:

  1. If only 2.1+ is supported, using headers may be a good approach;
  2. If you need to support versions 0.11.0 to 2.0, adding the schema version to the payload is the best way.

wenbingshen avatar May 21 '22 05:05 wenbingshen

It's not easy to solve header compatibility from 0.11.0 to 2.0 with ExtendedSerializer and ExtendedDeserializer alone.

I was wrong here, just checked Kafka-4208 and found that 0.11.0 to 2.0 are compatible by implementing the ExtendedSerializer and ExtendedDeserializer interfaces.

Now the questions come from: 1.whether we need to be compatible with clients below 0.11.0? 2.Implement two versions of Kafka Avro Serializer and Deserializer for this?

wenbingshen avatar May 21 '22 06:05 wenbingshen

Why would it break compatibility?

If a record was produced by a Pulsar producer with schema version configured,

  • Before: record.headers() is empty.
  • After: record.headers() has one header whose key is "schema.version".

2.Implement two versions of Kafka Avro Serializer and Deserializer for this?

It increases the cost of maintaining these SerDes. If it's worth doing that, please answer my previous question:

What's the disadvantage of encoding the schema version into the bytes of record's value? And what's the advantage of encoding the schema version into the Kafka header?

I cannot see the value of saving the schema version in the Header.

BewareMyPower avatar May 21 '22 09:05 BewareMyPower

BTW, see https://github.com/confluentinc/schema-registry/blob/master/avro-serializer/src/main/java/io/confluent/kafka/serializers/KafkaAvroDeserializer.java.

Confluent's KafkaAvroDeserializer also doesn't implement the serialize API with the Header parameter. It prepends the schema id in the value as https://github.com/streamnative/kop/issues/1290#issuecomment-1133132666 describes.

BewareMyPower avatar May 21 '22 09:05 BewareMyPower

If a record was produced by a Pulsar producer with schema version configured,

  • Before: record.headers() is empty.
  • After: record.headers() has one header whose key is "schema.version".

The header with the key "schema.version" will respond to the kafka client, is there any problem?

It increases the cost of maintaining these SerDes. If it's worth doing that, please answer my previous question:

My initial thought was that the extra fields were not encoded into the payload, when users use the BytesDeserializer, they don't need to pay attention to the extra fields when serializing the payload manually, now I know because Confluent's Avro serializer also adds some extra bytes before the bytes serialized from the value object, so that's not a problem.

This proposal make sense to me. Thanks.

wenbingshen avatar May 21 '22 10:05 wenbingshen

The header with the key "schema.version" will respond to the kafka client, is there any problem?

Not a big problem. But maybe some logic at application side might rely on the count of the headers.

when users use the BytesDeserializer,

They must know the code format in advance. For example, even if there're no extra bytes before the AVRO serialized bytes, they still need to know details. For example, in Pulsar, the fields are allowed to be null with the default AVRO schema. If users don't know that, the deserialization might fail.

BewareMyPower avatar May 21 '22 13:05 BewareMyPower

After the internal discussion in StreamNative, this task might be delayed for a while.

BewareMyPower avatar Jun 06 '22 05:06 BewareMyPower