pulsar
pulsar copied to clipboard
Flaky-test: MessageDispatchThrottlingTest.testBacklogConsumerCacheReads
Error: Tests run: 64, Failures: 16, Errors: 0, Skipped: 48, Time elapsed: 604.061 s <<< FAILURE! - in org.apache.pulsar.client.api.MessageDispatchThrottlingTest
Error: testBacklogConsumerCacheReads(org.apache.pulsar.client.api.MessageDispatchThrottlingTest) Time elapsed: 300.011 s <<< FAILURE!
org.testng.internal.thread.ThreadTimeoutException: Method org.apache.pulsar.client.api.MessageDispatchThrottlingTest.testBacklogConsumerCacheReads() didn't finish within the time-out 300000
at org.testng.internal.MethodInvocationHelper.invokeWithTimeoutWithNewExecutor(MethodInvocationHelper.java:371)
at org.testng.internal.MethodInvocationHelper.invokeWithTimeout(MethodInvocationHelper.java:282)
at org.testng.internal.TestInvoker.invokeMethod(TestInvoker.java:605)
at org.testng.internal.TestInvoker.retryFailed(TestInvoker.java:214)
at org.testng.internal.MethodRunner.runInSequence(MethodRunner.java:58)
at org.testng.internal.TestInvoker$MethodInvocationAgent.invoke(TestInvoker.java:822)
at org.testng.internal.TestInvoker.invokeTestMethods(TestInvoker.java:147)
at org.testng.internal.TestMethodWorker.invokeTestMethods(TestMethodWorker.java:146)
at org.testng.internal.TestMethodWorker.run(TestMethodWorker.java:128)
at java.base/java.util.ArrayList.forEach(ArrayList.java:1511)
at org.testng.TestRunner.privateRun(TestRunner.java:764)
at org.testng.TestRunner.run(TestRunner.java:585)
at org.testng.SuiteRunner.runTest(SuiteRunner.java:384)
at org.testng.SuiteRunner.runSequentially(SuiteRunner.java:378)
at org.testng.SuiteRunner.privateRun(SuiteRunner.java:337)
at org.testng.SuiteRunner.run(SuiteRunner.java:286)
at org.testng.SuiteRunnerWorker.runSuite(SuiteRunnerWorker.java:53)
at org.testng.SuiteRunnerWorker.run(SuiteRunnerWorker.java:96)
at org.testng.TestNG.runSuitesSequentially(TestNG.java:1218)
at org.testng.TestNG.runSuitesLocally(TestNG.java:1140)
at org.testng.TestNG.runSuites(TestNG.java:1069)
at org.testng.TestNG.run(TestNG.java:1037)
at org.apache.maven.surefire.testng.TestNGExecutor.run(TestNGExecutor.java:135)
at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeSingleClass(TestNGDirectoryTestSuite.java:112)
at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.executeLazy(TestNGDirectoryTestSuite.java:123)
at org.apache.maven.surefire.testng.TestNGDirectoryTestSuite.execute(TestNGDirectoryTestSuite.java:90)
at org.apache.maven.surefire.testng.TestNGProvider.invoke(TestNGProvider.java:146)
at org.apache.maven.surefire.booter.ForkedBooter.invokeProviderInSameClassLoader(ForkedBooter.java:384)
at org.apache.maven.surefire.booter.ForkedBooter.runSuitesInProcess(ForkedBooter.java:345)
at org.apache.maven.surefire.booter.ForkedBooter.execute(ForkedBooter.java:126)
at org.apache.maven.surefire.booter.ForkedBooter.main(ForkedBooter.java:418)
sometimes I can reproduce this in local machine

It should be 200*10
I print some log in messagelistener. It seems there has a bug in recent change
2022-08-08T16:57:10,898 - ERROR - [broker-topic-workers-OrderedExecutor-7-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-1}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 3
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:587) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:558) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$6(PersistentDispatcherMultipleConsumers.java:548) ~[classes/:?]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-08T16:57:10,919 - ERROR - [broker-topic-workers-OrderedExecutor-7-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-1}] [-1] Failed to parse message metadata
java.lang.IndexOutOfBoundsException: readerIndex(42) + length(4) exceeds writerIndex(44): UnpooledDuplicatedByteBuf(ridx: 42, widx: 44, cap: 44/44, unwrapped: UnpooledHeapByteBuf(ridx: 0, widx: 44, cap: 44/44))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
at io.netty.buffer.AbstractByteBuf.skipBytes(AbstractByteBuf.java:971) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:267) ~[classes/:?]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.lambda$filterEntriesForConsumer$1(AbstractBaseDispatcher.java:117) ~[classes/:?]
at java.util.Optional.orElseGet(Optional.java:364) ~[?:?]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:115) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:657) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:558) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$6(PersistentDispatcherMultipleConsumers.java:548) ~[classes/:?]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
org.apache.bookkeeper.mledger.impl.cache.RangeEntryCacheImpl#insert


maybe this should return true ,because if entry is in cache , we should release the entry.
after changed this , I can not reproduce this issue.
@leizhiyuan Do you want to create a PR to fix the issue?
@leizhiyuan Do you want to create a PR to fix the issue?
ok
Looks like this problem was fixed by PR #16996
I just ran into the following error while running testBacklogConsumerCacheReads
on my MacOS on a branch that includes #16996. Looks like there might be a separate issue, too. I'm going to start looking into what could be wrong.
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 3
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1874) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:593) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:567) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$6(PersistentDispatcherMultipleConsumers.java:553) ~[classes/:?]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-12T16:40:03,155 - INFO - [pulsar-io-51-2:ServerCnx@1006] - [/127.0.0.1:49767] Subscribing on topic persistent://my-property/my-ns/cache-read / sub-4
2022-08-12T16:40:03,155 - ERROR - [broker-topic-workers-OrderedExecutor-11-0:Commands@1860] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-3}] [-1] Failed to parse message metadata
java.lang.IndexOutOfBoundsException: readerIndex(39) + length(8) exceeds writerIndex(44): UnpooledDuplicatedByteBuf(ridx: 39, widx: 44, cap: 44/44, unwrapped: UnpooledHeapByteBuf(ridx: 0, widx: 44, cap: 44/44))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
at io.netty.buffer.AbstractByteBuf.checkReadableBytes(AbstractByteBuf.java:1428) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
at io.netty.buffer.AbstractByteBuf.skipBytes(AbstractByteBuf.java:971) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:260) ~[classes/:?]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1874) ~[classes/:?]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.lambda$filterEntriesForConsumer$1(AbstractBaseDispatcher.java:117) ~[classes/:?]
at java.util.Optional.orElseGet(Optional.java:364) ~[?:?]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:115) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:663) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:567) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$6(PersistentDispatcherMultipleConsumers.java:553) ~[classes/:?]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Reopening this ticket because I am consistently getting this test to fail on my local machine.
I also just go this variant:
2022-08-12T16:51:21,672 - ERROR - [broker-topic-workers-OrderedExecutor-7-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-2}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 3
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:599) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:555) ~[classes/:?]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-12T16:51:21,682 - ERROR - [broker-topic-workers-OrderedExecutor-7-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-2}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 6
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.lambda$filterEntriesForConsumer$1(AbstractBaseDispatcher.java:117) ~[classes/:?]
at java.util.Optional.orElseGet(Optional.java:364) ~[?:?]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:115) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:669) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:555) ~[classes/:?]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
Seems like we have a data race. Still digging.
Seems like this might be related to https://github.com/apache/pulsar/pull/10924.
https://github.com/apache/pulsar/pull/10924 is not related. Just before calling the parseFrom
in the stack trace, we call skipBrokerEntryMetadataIfExist
.
https://github.com/apache/pulsar/blob/926834ef8b2b57d3964aa7e9773e6245bcee861c/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L436-L446
It's clear to me now that the issue is a data race with the entries
array. After https://github.com/apache/pulsar/pull/16603, entries are published to another thread. I believe they are "safely published" because the calling thread is within a synchronized
block and the receiving thread passes the entries
array to a synchronized
method:
https://github.com/apache/pulsar/blob/abff91f881d2677ecf76943c2bfb4f9983fe50b6/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/PersistentDispatcherMultipleConsumers.java#L554-L555
The fundamental race comes from the caching. When an entry is expired from the cache, we call release()
, which deallocates the object.
Here is another stack trace I found from running the test. It looks like we already found similar stack traces here https://github.com/apache/pulsar/issues/14436. My analysis aligns with @lhotari's. I'll have a PR up soon.
2022-08-12T23:52:24,841 - ERROR - [broker-topic-workers-OrderedExecutor-3-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-1}] [-1] Failed to parse message metadata
java.lang.IllegalArgumentException: Invalid unknonwn tag type: 7
at org.apache.pulsar.common.api.proto.LightProtoCodec.skipUnknownField(LightProtoCodec.java:270) ~[classes/:?]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1370) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:599) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:556) ~[classes/:?]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-12T23:52:24,874 - ERROR - [broker-topic-workers-OrderedExecutor-3-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-1}] [-1] Failed to parse message metadata
java.lang.IndexOutOfBoundsException: readerIndex(44) + length(1) exceeds writerIndex(44): UnpooledDuplicatedByteBuf(ridx: 44, widx: 44, cap: 44/44, unwrapped: UnpooledHeapByteBuf(ridx: 0, widx: 44, cap: 44/44))
at io.netty.buffer.AbstractByteBuf.checkReadableBytes0(AbstractByteBuf.java:1442) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
at io.netty.buffer.AbstractByteBuf.readByte(AbstractByteBuf.java:730) ~[netty-buffer-4.1.77.Final.jar:4.1.77.Final]
at org.apache.pulsar.common.api.proto.LightProtoCodec.readVarInt(LightProtoCodec.java:140) ~[classes/:?]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1234) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.lambda$filterEntriesForConsumer$1(AbstractBaseDispatcher.java:117) ~[classes/:?]
at java.util.Optional.orElseGet(Optional.java:364) ~[?:?]
at org.apache.pulsar.broker.service.AbstractBaseDispatcher.filterEntriesForConsumer(AbstractBaseDispatcher.java:115) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:669) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:556) ~[classes/:?]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
2022-08-12T23:52:24,865 - ERROR - [broker-topic-workers-OrderedExecutor-2-0:Commands@1859] - [PersistentSubscription{topic=persistent://my-property/my-ns/cache-read, name=sub-0}] [-1] Failed to parse message metadata
java.lang.IllegalStateException: Some required fields are missing
at org.apache.pulsar.common.api.proto.MessageMetadata.checkRequiredFields(MessageMetadata.java:1378) ~[classes/:?]
at org.apache.pulsar.common.api.proto.MessageMetadata.parseFrom(MessageMetadata.java:1373) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:445) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.parseMessageMetadata(Commands.java:432) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekMessageMetadata(Commands.java:1854) ~[classes/:?]
at org.apache.pulsar.common.protocol.Commands.peekAndCopyMessageMetadata(Commands.java:1873) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.trySendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:599) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.sendMessagesToConsumers(PersistentDispatcherMultipleConsumers.java:573) ~[classes/:?]
at org.apache.pulsar.broker.service.persistent.PersistentDispatcherMultipleConsumers.lambda$readEntriesComplete$5(PersistentDispatcherMultipleConsumers.java:556) ~[classes/:?]
at org.apache.bookkeeper.mledger.util.SafeRun$1.safeRun(SafeRun.java:32) ~[classes/:?]
at org.apache.bookkeeper.common.util.SafeRunnable.run(SafeRunnable.java:36) ~[bookkeeper-common-4.15.0.jar:4.15.0]
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) ~[?:?]
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) ~[?:?]
at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[netty-common-4.1.77.Final.jar:4.1.77.Final]
at java.lang.Thread.run(Thread.java:833) ~[?:?]
I wrote https://github.com/michaeljmarshall/pulsar/commit/8f48cf2ebc0bc5b8004be807611271aea29d8c5f thinking that it would solve the issue. However, I am still getting the same exceptions, so there must be something I am missing. I'll continue on this tomorrow or Monday.