pulsar
pulsar copied to clipboard
Support for tombstones (null value in messages) does not work
Describe the bug
The solution provided by #7139 to the BUG #4803 does not work.
- When trying to read a message with
nullvalue aNullPointerExceptionis thrown in other part of the code.
msg = consumer.receive(timeoutMillis, TimeUnit.MILLISECONDS));
java.lang.NullPointerException
at org.apache.pulsar.client.impl.ConsumerStatsRecorderImpl.updateNumMsgsReceived(ConsumerStatsRecorderImpl.java:169)
at org.apache.pulsar.client.impl.ConsumerImpl.messageProcessed(ConsumerImpl.java:1423)
at org.apache.pulsar.client.impl.ConsumerImpl.internalReceive(ConsumerImpl.java:431)
at org.apache.pulsar.client.impl.ConsumerBase.receive(ConsumerBase.java:175)
...
- It should not be required to explicitly indicate a null value to producer (only-key values should work just just fine). Exception thrown when working with implicit null value messages is EOFException in this case (the same before this a solution was provided).
To Reproduce
I have created a test project to reproduce these issues (null values implicitly and explictly set with both schema and schemaless consumer): pulsar-tombstone-test
Read README.md to reproduce it.
Expected behavior
Tombstones (null values in mesages with our without schema but with key) should be supported whether you indicate an schema or not and whether you explicitly indicate a null value or not (implicit null value). All tests should pass in the example project.
Producer<byte []> schemalessProducer = client.newProducer(Schema.BYTES).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<byte []> schemalessConsumer = client.newConsumer(Schema.BYTES).topic(TOPIC).subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Implicit tombstone without schema
schemalessProducer.key("1").send();
// Explicit tombstone (the one supposedly to work)
schemalessProducer.key("2").value(null).send();
Message<byte[]> implicitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue());
Message<byte[]> explictitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue())
System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
Producer<DummyObject> schemaProducer = client.newProducer(Schema.AVRO(DummyObject.class)).topic(TOPIC).messageRoutingMode(MessageRoutingMode.SinglePartition).create();
Consumer<DummyObject> schemaConsumer = client.newConsumer(Schema.AVRO(DummyObject.class)).topic(TOPIC).subscriptionName("test")
.subscriptionInitialPosition(SubscriptionInitialPosition.Earliest).subscribe();
// Implicit tombstone without schema
schemaProducer.key("1").send();
// Explicit tombstone (the one supposedly to work)
schemaProducer.key("2").value(null).send();
Message<DummyObject> implicitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue());
Message<DummyObject> explictitTombstone = schemalessReceiver.receive(2, TimeUnit.SECONDS).getValue())
System.out.println(String.format("Implicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
System.out.println(String.format("Explicit tombstone: {key=%s, value=%s}", implicitTombstone.getKey(), implicitTombstone.getValue()));
Screenshots
Desktop (please complete the following information):
- Windows 10 with Docker Deskto to run Pulsar containers
Additional context
Thanks @fmiguelez for open the issue and related PR. @gaoran10 to help review the Pr.
I do not think that this ticket should be closed as pull request only solves one of the cases described by the tests
It was closed because of Fixes comment in the description. I have reopened it.
@fmiguelez Sorry, late response.
producer.newMessage().key("1").send()
Currently, if we send messages as above the message payload is ByteBuffer.allocate(0), thus if the topic with Avro schema, the decoder can't decode an empty byte array, this will cause java.io.EOFException as below.
org.apache.pulsar.client.api.SchemaSerializationException: java.io.EOFException
at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:77)
at org.apache.pulsar.client.api.schema.SchemaReader.read(SchemaReader.java:36)
at org.apache.pulsar.client.impl.schema.StructSchema.decode(StructSchema.java:107)
at org.apache.pulsar.client.impl.MessageImpl.getValue(MessageImpl.java:301)
at org.apache.pulsar.broker.transaction.TransactionProduceTest.test(TransactionProduceTest.java:639)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:124)
at org.testng.internal.Invoker.invokeMethod(Invoker.java:583)
at org.testng.internal.Invoker.invokeTestMethod(Invoker.java:719)
at org.testng.internal.Invoker.invokeTestMethods(Invoker.java:989)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:125)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:109)
at org.testng.TestRunner.privateRun(TestRunner.java:648)
at org.testng.TestRunner.run(TestRunner.java:505)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:455)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:450)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:415)
at org.testng.SuiteRunner.run(SuiteRunner.java:364)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:52)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:84)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1208)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1137)
at org.testng.TestNG.runSuites(TestNG.java:1049)
at org.testng.TestNG.run(TestNG.java:1017)
at com.intellij.rt.testng.IDEARemoteTestNG.run(IDEARemoteTestNG.java:66)
at com.intellij.rt.testng.RemoteTestNGStarter.main(RemoteTestNGStarter.java:110)
Caused by: java.io.EOFException
at org.apache.avro.io.BinaryDecoder.ensureBounds(BinaryDecoder.java:509)
at org.apache.avro.io.BinaryDecoder.readInt(BinaryDecoder.java:149)
at org.apache.avro.io.BinaryDecoder.readIndex(BinaryDecoder.java:460)
at org.apache.avro.io.ResolvingDecoder.readIndex(ResolvingDecoder.java:282)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:178)
at org.apache.avro.reflect.ReflectDatumReader.readField(ReflectDatumReader.java:290)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:237)
at org.apache.avro.specific.SpecificDatumReader.readRecord(SpecificDatumReader.java:123)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:170)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:151)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:144)
at org.apache.pulsar.client.impl.schema.reader.AvroReader.read(AvroReader.java:75)
... 28 more
producer.newMessage().key("1").value(null).send()
If we specify the value is null, the messageMetaData will record the flag msgMetadataBuilder.setNullValue(true), the consumer client will check the nullValue flag before schema decoder decode the payload, if the flag is true will return null value directly.
- Currently, the message value has three forms, normal value, empty byte array, and null value, if topics use the Avro schema consumer couldn't handle the empty byte array payload, maybe the consumer should return a null value for the empty byte array.
@sijie, @gaoran10 Any update on this?
@fmiguelez Is there any block about this issue? Could you provide more details?
I think pulsar sql can't deal with empty payload messages because of the same issues described here. See my issue #13127.
@gaoran10 I have taken another look at the current status of this issue. I have upgraded pulsar-tombstone-test to use Pulsar 2.8.4. Current status is:
- ✔ Explicit tombstones (setting
nullof message value on publication) are consumed without error - ❌ Implicit tombstones (only key is set on publication) throw an EOFException on consumption
- ❌ Functions/sinks fail to receive a
nullas input on consumption throwing an exception instead (with both explicit and implicit tombstones)
Third use case is not covered by the provided sample test.
@fmiguelez Sorry for the late response, thanks for your tests.
❌ Implicit tombstones (only key is set on publication) throw an EOFException on consumption
Currently, if we don't specify value when producing messages, the payload will be an empty byte array, this will cause EOFException when consumers try to deserialize the payload. I'll handle this problem.
❌ Functions/sinks fail to receive a null as input on consumption throwing an exception instead (with both explicit and implicit tombstones)
Could you provide error logs? Maybe caused by this method PulsarRecord#getValue?
this issue looks fixed by https://github.com/apache/pulsar/pull/9046, closing
@dao-jun please re-open this issue, as it was clearly stated by others in their comments (after the pr you linked was merged) that there's still issue not fixed.