pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[Bug] Cannot determine whether the message is a duplicate at this time

Open graysonzeng opened this issue 2 years ago • 3 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Version

pulsar version:3.1.1,master

Minimal reproduce step

broker count: 2 bookie count: 5

broker config: managedLedgerDefaultAckQuorum: "2" managedLedgerDefaultEnsembleSize: "4" managedLedgerDefaultWriteQuorum: "3"

// Open Deduplication config brokerDeduplicationEnabled: "true"

// enable Interceptor brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

Enable batch producer by default

Using pulsar perf the publishing rate is 200000 messages/sec and the total number of messages is 100000000. Consume it at the same time.

bin/pulsar-perf produce persistent://pulsar/default/input_test -r 200000 -m 10000000

At the same time, Use a function to consume and produce messages, and set the sequenceId to the producer in the function.(Use EFFECTIVELY_ONCE mode)

What did you expect to see?

Complete the production and consumption of all messages

What did you see instead?

the producer fall into the following error and be stuck because of this error until the broker is restarted.


2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-3] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-9-0] Re-Sending 1142 messages to server
2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 1 messages to server
2024-01-17T14:30:58,332+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-9] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 4340 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-1] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 395 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-6] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 1482 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-5] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Re-Sending 10 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ProducerImpl - [persistent://pulsar/default2/alltables6-partition-0] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Re-Sending 454 messages to server
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 - R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,333+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [21.24.16.52/21.24.16.52:6650] Got exception io.netty.channel.StacklessClosedChannelException
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] WARN  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Received send error from server: PersistenceError : Cannot determine whether the message is a duplicate at this time
2024-01-17T14:30:58,335+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ClientCnx - [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] Disconnected
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-0] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-9-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-5] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-8] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-1] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-5-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-2] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-6-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s
2024-01-17T14:30:58,336+0800 [pulsar-client-io-17-1] INFO  org.apache.pulsar.client.impl.ConnectionHandler - [persistent://pulsar/default2/alltables6-partition-6] [alltables622-persistent://pulsar/default2/alltables6-persistent://pulsar/default2/input_test5-partition-7-0] Closed connection [id: 0xd845ad90, L:/9.165.149.251:33602 ! R:21.24.16.52/21.24.16.52:6650] -- Will try again in 0.1 s

Anything else?

After stuck, the heap dump of the broker was generated and something unusual was discovered

image

the pendingAddOps of LedgerHandle is also retains a lot of requests, the first request status in the queue is not completed, and pendingWriteRequests = 0, and addEntrySuccessBookies is empty.

image

But the second request is completed status.

https://github.com/apache/pulsar/blob/c834feb459369f497c68cd42dbc1625b11551f72/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/persistent/MessageDeduplication.java#L381

In isDuplicate of MessageDeduplication, the sequenceId is between lastSequenceIdPersisted and highestSequencedPushed, this is the reason why we receive Cannot determine whether the message is a duplicate at this time error

                if (lastSequenceIdPersisted != null && sequenceId <= lastSequenceIdPersisted) {
                    return MessageDupStatus.Dup;
                } else {
                    return MessageDupStatus.Unknown;
                }

The client received this error, then disconnected and resent the message. The resent message was still at sequenceId > lastSequenceIdPersisted, causing it to fall into a loop.

Update

An important log message was found

14:29:19.961 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.bookkeeper.common.util.SingleThreadExecutor - Error while running task: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))
java.lang.IndexOutOfBoundsException: readerIndex: 31215, writerIndex: 21324 (expected: 0 <= readerIndex <= writerIndex <= capacity(65536))
	at io.netty.buffer.AbstractByteBuf.checkIndexBounds(AbstractByteBuf.java:112) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.AbstractByteBuf.setIndex(AbstractByteBuf.java:144) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.DuplicatedByteBuf.<init>(DuplicatedByteBuf.java:56) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.DuplicatedByteBuf.<init>(DuplicatedByteBuf.java:42) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.UnpooledDuplicatedByteBuf.<init>(UnpooledDuplicatedByteBuf.java:24) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.AbstractPooledDerivedByteBuf$PooledNonRetainedDuplicateByteBuf.<init>(AbstractPooledDerivedByteBuf.java:164) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.AbstractPooledDerivedByteBuf.duplicate0(AbstractPooledDerivedByteBuf.java:157) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.PooledSlicedByteBuf.duplicate(PooledSlicedByteBuf.java:118) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.CompositeByteBuf$Component.duplicate(CompositeByteBuf.java:1947) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at io.netty.buffer.CompositeByteBuf.component(CompositeByteBuf.java:1556) ~[io.netty-netty-buffer-4.1.104.Final.jar:4.1.104.Final]
	at org.apache.bookkeeper.proto.checksum.DigestManager.computeDigestAndPackageForSendingV2(DigestManager.java:149) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.proto.checksum.DigestManager.computeDigestAndPackageForSending(DigestManager.java:106) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.client.PendingAddOp.initiate(PendingAddOp.java:246) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.client.LedgerHandle.doAsyncAddEntry(LedgerHandle.java:1363) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.client.LedgerHandle.asyncAddEntry(LedgerHandle.java:1061) ~[bookkeeper-server-4.17.0-SNAPSHOT.jar:4.17.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.OpAddEntry.initiate(OpAddEntry.java:144) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.internalAsyncAddEntry(ManagedLedgerImpl.java:862) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.mledger.impl.ManagedLedgerImpl.lambda$asyncAddEntry$3(ManagedLedgerImpl.java:794) ~[org.apache.pulsar-managed-ledger-3.3.0-SNAPSHOT.jar:3.3.0-SNAPSHOT]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:137) ~[org.apache.bookkeeper-bookkeeper-common-4.16.3.jar:4.16.3]
	at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:107) ~[org.apache.bookkeeper-bookkeeper-common-4.16.3.jar:4.16.3]
	at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.104.Final.jar:4.1.104.Final]
	at java.lang.Thread.run(Thread.java:833) ~[?:?]

It points to bookkeeper DigestManager.computeDigestAndPackageForSendingV2()

https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/proto/checksum/DigestManager.java#L149

private ReferenceCounted computeDigestAndPackageForSendingV2(long entryId, long lastAddConfirmed, long length,
                                                                 ByteBuf data, byte[] masterKey, int flags) {

        if (unwrapped instanceof CompositeByteBuf) {
            CompositeByteBuf cbb = (CompositeByteBuf) unwrapped;
            for (int i = 0; i < cbb.numComponents(); i++) {
                // throws a IndexOutOfBoundsException
                ByteBuf b = cbb.component(i);
                digest = update(digest, b, b.readerIndex(), b.readableBytes());
            }
        } else {
            digest = update(digest, unwrapped, unwrapped.readerIndex(), unwrapped.readableBytes());
        }

    }

In normal circumstances, after calculation, the result will be assigned to toSend and the payload will be changed to null.

https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L231

public synchronized void initiate() {
       ......
        this.toSend = lh.macManager.computeDigestAndPackageForSending(
                entryId, lh.lastAddConfirmed, currentLedgerLength,
                payload, lh.ledgerKey, flags);
        // ownership of RefCounted ByteBuf was passed to computeDigestAndPackageForSending
        payload = null;
       ......
    }

Therefore, we can see the peek of pendingAddOps still retains the payload, and toSend is empty

image

In bookkeeper PendingAddOp.unsetSuccessAndSendWriteRequest(), if toSend is null, it is return directly, So this request has been retained in pendingAddOps since computeDigest failed https://github.com/apache/bookkeeper/blob/113d40ac5057709b3e44b9281231456b4ef81065/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/PendingAddOp.java#L183

Are you willing to submit a PR?

  • [x] I'm willing to submit a PR!

graysonzeng avatar Jan 12 '24 09:01 graysonzeng

broker config: managedLedgerDefaultAckQuorum: "2" managedLedgerDefaultEnsembleSize: "4" managedLedgerDefaultWriteQuorum: "3"

@graysonzeng Not related to the reported issue, but it's good to be aware that when using this type of config, it won't be optimal for read performance since sticky reads aren't used with bookies when E != Qw. More details in #18003 and https://github.com/apache/bookkeeper/pull/4131 . Related Pulsar Slack thread: https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1699487100764749?thread_ts=1698225686.705339&cid=C5Z4T36F7 "since ensemble=Qw, reads rates have increased x30" .

lhotari avatar Jan 12 '24 11:01 lhotari

@graysonzeng Not related to the reported issue, but it's good to be aware that when using this type of config, it won't be optimal for read performance since sticky reads aren't used with bookies when E != Qw. More details in #18003 and apache/bookkeeper#4131 . Related Pulsar Slack thread: https://apache-pulsar.slack.com/archives/C5Z4T36F7/p1699487100764749?thread_ts=1698225686.705339&cid=C5Z4T36F7 "since ensemble=Qw, reads rates have increased x30" .

Thanks so much for the heads up! I definitely missed it. I will take the time to read it.

graysonzeng avatar Jan 12 '24 15:01 graysonzeng

// enable Interceptor brokerEntryMetadataInterceptors: org.apache.pulsar.common.intercept.AppendIndexMetadataInterceptor

@graysonzeng does it reproduce without brokerEntryMetadataInterceptors ?

lhotari avatar Feb 12 '24 12:02 lhotari

@lhotari I haven't tested it again, but I think it's not reproducible without brokerEntryMetadataInterceptors. Currently it seems that CompositeByteBuf will be generated in addBrokerEntryMetadata only when brokerEntryMetadataInterceptors are enabled.

https://github.com/apache/pulsar/blob/0b6bd70b8d1e7b7cd4d82aa2e0cbfd5e0323d440/pulsar-common/src/main/java/org/apache/pulsar/common/protocol/Commands.java#L1723

graysonzeng avatar Feb 21 '24 02:02 graysonzeng

This issue is most likely related to #22601 / #22810.

lhotari avatar May 31 '24 13:05 lhotari