pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

Support for tombstones (null value in messages) does not work

Open fmiguelez opened this issue 5 years ago • 11 comments

Describe the bug

The solution provided by #7139 to the BUG #4803 does not work.

  • When trying to read a message with null value a NullPointerException is thrown in other part of the code.
        msg = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS));

	java.lang.NullPointerException
		at org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.updateNumMsgsReceived(ConsumerStatsRecorderImpl.java:169)
		at org.apache.pulsar.client.impl.ConsumerImpl.messageProcessed(ConsumerImpl.java:1423)
		at org.apache.pulsar.client.impl.ConsumerImpl.internalReceive(ConsumerImpl.java:431)
		at org.apache.pulsar.client.impl.ConsumerBase.receive(ConsumerBase.java:175)
                ...
  • It should not be required to explicitly indicate a null value to producer (only-key values should work just just fine). Exception thrown when working with implicit null value messages is EOFException in this case (the same before this a solution was provided).

To Reproduce I have created a test project to reproduce these issues (null values implicitly and explictly set with both schema and schemaless consumer): pulsar-tombstone-test

Read README.md to reproduce it.

Expected behavior Tombstones (null values in mesages with our without schema but with key) should be supported whether you indicate an schema or not and whether you explicitly indicate a null value or not (implicit null value). All tests should pass in the example project.

Producer<byte []> schemalessProducer = client.newProducer(Schema.BYTES).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();

Consumer<byte []> schemalessConsumer = client.newConsumer(Schema.BYTES).topic(TOPIC).subscriptionName("test")
				.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

// Implicit tombstone  without schema
schemalessProducer.key("1").send();

// Explicit tombstone (the one supposedly to work)
schemalessProducer.key("2").value(null).send();

Message<byte[]> implicitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue());
Message<byte[]> explictitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue())

System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
Producer<DummyObject> schemaProducer = client.newProducer(Schema.AVRO(DummyObject.class)).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();

Consumer<DummyObject> schemaConsumer = client.newConsumer(Schema.AVRO(DummyObject.class)).topic(TOPIC).subscriptionName("test")
				.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();

// Implicit tombstone  without schema
schemaProducer.key("1").send();

// Explicit tombstone (the one supposedly to work)
schemaProducer.key("2").value(null).send();

Message<DummyObject> implicitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue());
Message<DummyObject> explictitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue())

System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));

Screenshots

Desktop (please complete the following information):

  • Windows 10 with Docker Deskto to run Pulsar containers

Additional context

fmiguelez avatar Jul 01 '20 07:07 fmiguelez

Thanks @fmiguelez for open the issue and related PR. @gaoran10 to help review the Pr.

jiazhai avatar Jul 02 '20 02:07 jiazhai

I do not think that this ticket should be closed as pull request only solves one of the cases described by the tests

fmiguelez avatar Jul 07 '20 11:07 fmiguelez

It was closed because of Fixes comment in the description. I have reopened it.

sijie avatar Jul 07 '20 17:07 sijie

@fmiguelez Sorry, late response.

  1. producer.newMessage().key("1").send()

Currently, if we send messages as above the message payload is ByteBuffer.allocate(0), thus if the topic with Avro schema, the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.

org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException

	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
	at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
	at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
	at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
	at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
	at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
	at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
	at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
	at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
	at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
	at org.testng.TestRunner.privateRun(TestRunner.java:648)
	at org.testng.TestRunner.run(TestRunner.java:505)
	at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
	at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
	at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
	at org.testng.SuiteRunner.run(SuiteRunner.java:364)
	at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
	at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
	at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
	at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
	at org.testng.TestNG.runSuites(TestNG.java:1049)
	at org.testng.TestNG.run(TestNG.java:1017)
	at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
	at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
Caused by: java.io.EOFException
	at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
	at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
	at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
	at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
	at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
	at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
	at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
	at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
	at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
	at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
	... 28 more
  1. producer.newMessage().key("1").value(null).send()

If we specify the value is null, the messageMetaData will record the flag msgMetadataBuilder.setNullValue(true), the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null value directly.

  1. Currently, the message value has three forms, normal value, empty byte array, and null value, if topics use the Avro schema consumer couldn't handle the empty byte array payload, maybe the consumer should return a null value for the empty byte array.

gaoran10 avatar Sep 24 '20 03:09 gaoran10

@sijie, @gaoran10 Any update on this?

fmiguelez avatar Feb 22 '21 15:02 fmiguelez

@fmiguelez Is there any block about this issue? Could you provide more details?

gaoran10 avatar Feb 25 '21 01:02 gaoran10

I think pulsar sql can't deal with empty payload messages because of the same issues described here. See my issue #13127.

longtengz avatar Dec 06 '21 08:12 longtengz

@gaoran10 I have taken another look at the current status of this issue. I have upgraded pulsar-tombstone-test to use Pulsar 2.8.4. Current status is:

  • ✔ Explicit tombstones (setting null of message value on publication) are consumed without error
  • ❌ Implicit tombstones (only key is set on publication) throw an EOFException on consumption
  • ❌ Functions/sinks fail to receive a null as input on consumption throwing an exception instead (with both explicit and implicit tombstones)

Third use case is not covered by the provided sample test.

fmiguelez avatar Feb 02 '23 09:02 fmiguelez

@fmiguelez Sorry for the late response, thanks for your tests.

❌ Implicit tombstones (only key is set on publication) throw an EOFException on consumption

Currently, if we don't specify value when producing messages, the payload will be an empty byte array, this will cause EOFException when consumers try to deserialize the payload. I'll handle this problem.

❌ Functions/sinks fail to receive a null as input on consumption throwing an exception instead (with both explicit and implicit tombstones)

Could you provide error logs? Maybe caused by this method PulsarRecord#getValue?

gaoran10 avatar Feb 09 '23 04:02 gaoran10

this issue looks fixed by https://github.com/apache/pulsar/pull/9046, closing

dao-jun avatar May 19 '24 15:05 dao-jun

@dao-jun please re-open this issue, as it was clearly stated by others in their comments (after the pr you linked was merged) that there's still issue not fixed.

longtengz avatar Jun 24 '24 10:06 longtengz