kafkaflow icon indicating copy to clipboard operation
kafkaflow copied to clipboard

[Bug Report]:Unable to produce tombstone records

Open adimoraret opened this issue 1 year ago • 11 comments
trafficstars

Prerequisites

  • [X] I have searched issues to ensure it has not already been reported

Description

I am unable to produce tombstone records. It looks like the message value is sent as byte[0] instead of null.

Steps to reproduce

services.AddKafka(kafka => kafka
            .AddCluster(cluster => cluster
                .WithBrokers(new[] { kafkaBrokers })
                .AddProducer("test", producer => producer
                    .AddMiddlewares(middlewares => middlewares
                        .AddSerializer<JsonCoreSerializer>(_ => new JsonCoreSerializer(options))
                    )
                    .WithAcks(Acks.All)
                )
            )
        );

await producer.ProduceAsync("test", Guid.NewGuid().ToString(), null);

Expected behavior

Message value should be sent as null

Actual behavior

Message value is sent as byte[0]

KafkaFlow version

3.0.3

adimoraret avatar Feb 01 '24 09:02 adimoraret

Do you have any error while producing the message?

tomaszprasolek avatar Feb 16 '24 11:02 tomaszprasolek

No, there is no error. But it won't produce tombstone records when using a kafka upsert source: https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/ .

adimoraret avatar Feb 19 '24 07:02 adimoraret

Like we discussed on kafkaflow on slack, by using a custom serializer, you can bypass this limitation by checking if the message is null before serializing the message value.

This should produce a message where the message value is an empty byte[] which is the tombstone.

Meanwhile, we will address this issue, by creating a pull request to fix the JsonCoreSerializer.

Regarding the Kakfa upsert source, flink we won't expect a different behavior from Kafka client as they both produce a null record.

JoaoRodriguesGithub avatar Feb 20 '24 16:02 JoaoRodriguesGithub

I have tried that approach, end to end, using kafka-upsert source and iceberg table destination in AWS. It sends indeed an empty byte array, but that did not translate into a tombstone record. If I read correctly this https://nightlies.apache.org/flink/flink-docs-master/docs/connectors/table/upsert-kafka/, it would only work if we send null for message. Maybe I missed something, but I'm happy to test your PR once ready.

adrianmoraret-mdsol avatar Feb 20 '24 17:02 adrianmoraret-mdsol

Fixing the issue with JsonCoreSerializer when producing a message with a null value it results in a tombstone record where the message value is an empty byte[].

"A null payload is a payload with 0 bytes" reference here

public Task SerializeAsync(object? message, Stream output, ISerializerContext context)
    {
        if (message == null)
        {
            return Task.CompletedTask;
        }

        return SerializeNonNullMessageAsync(message, output);
    }

As an alternative, you can always use the native Kafka client to confirm that it also produces a tombstone record with an empty byte[] like KafkaFlow.

JoaoRodriguesGithub avatar Feb 21 '24 17:02 JoaoRodriguesGithub

When fixing it with the serializer, then it has to be changed for all serializers. Sending null for tombstoning should not be part of the serialization imho.

great library by the way. :)

esskar avatar Feb 26 '24 21:02 esskar

Fixing the issue with JsonCoreSerializer when producing a message with a null value it results in a tombstone record where the message value is an empty byte[].

"A null payload is a payload with 0 bytes" reference here

public Task SerializeAsync(object? message, Stream output, ISerializerContext context)
    {
        if (message == null)
        {
            return Task.CompletedTask;
        }

        return SerializeNonNullMessageAsync(message, output);
    }

As an alternative, you can always use the native Kafka client to confirm that it also produces a tombstone record with an empty byte[] like KafkaFlow.

I resolved it the same way for the ProtobufSerializer. Is there an idea to fix it in general?

esskar avatar Mar 25 '24 09:03 esskar