pulsar icon indicating copy to clipboard operation
pulsar copied to clipboard

[fix][broker]excessive replication speed leads to error: Producer send queue is full

Open poorbarcode opened this issue 7 months ago • 8 comments

Motivation

Background

  • Replication has a limitation that limits the maximum in-flight publishing message.
    • Users can configure the quota by replicationProducerQueueSize
    • Replication has a variable named pendingMessages that records how many messages are pending to be published
    • When there is a task that fetches schemas, the replicator will pause and mark fetchSchemaInProgress as true
    • When the replicator needs to rewind the cursor, the replicator will pause and mark waitForCursorRewinding as true
  • Replication allows at most one inflight cursor reading, which was limited by havePendingRead.
  • Reolication allows multi inflight publishing

Issue: The multiple mechanisms described above can not work well

time/thread read more entries A read more entries B
1 calculate permits: got 1000 calculate permits: got 1000
2 set havePendingRead -> true
3 start reading
4 read out 1000 msgs
5 publishing is messages, and decrease permits one by one
6 set havePendingRead -> true
7 the 1000 msgs are still in-progress
8 start reading
9 read out 1000 msgs
10 publishing is messages, and decrease permits one by one
11 There are 2000 msgs in publishing, which is more than expected, get error Producer send queue is full
12 rewind cursor and trigger more read more entries, leads the situation bader

pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://public/default/output-partition-0 | prod-->prod-repl] Error producing on re
mote broker
pulsar-broker org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator.replicateEntries(GeoPersistentReplicator.java:191) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:313) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:180) ~[io.streamnative-managed-ledger-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
pulsar-broker     at java.base/java.lang.Thread.run(Unknown Source) [?:?]
pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO  org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/output-partition-0-pulsar.repl.prod-repl] Rewind from 2111423:1 to 2111423
:0
pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://public/default/output-partition-0 | prod-->prod-repl] Error producing on re
mote broker
pulsar-broker org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator.replicateEntries(GeoPersistentReplicator.java:191) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:313) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:180) ~[io.streamnative-managed-ledger-3.3.5.jar:3.3.5]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker     at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
pulsar-broker     at java.base/java.lang.Thread.run(Unknown Source) [?:?]

Modifications

  • merge multiple mechanisms into one and fix the issue

Documentation

  • [ ] doc
  • [ ] doc-required
  • [x] doc-not-needed
  • [ ] doc-complete

Matching PR in forked repository

PR in forked repository: x

poorbarcode avatar Apr 21 '25 02:04 poorbarcode

@poorbarcode is there any relationship to PIP-269: Add an epoch of cursor to discard outdated reading or any other previous reported issues ?

lhotari avatar Apr 22 '25 15:04 lhotari

@lhotari

@poorbarcode is there any relationship to https://github.com/apache/pulsar/pull/20469 or any other previous reported issues ?

It does not relate to

poorbarcode avatar Apr 23 '25 02:04 poorbarcode

/pulsarbot rerun-failure-checks

poorbarcode avatar Apr 24 '25 02:04 poorbarcode

Codecov Report

:x: Patch coverage is 81.36364% with 41 lines in your changes missing coverage. Please review. :white_check_mark: Project coverage is 74.26%. Comparing base (bbc6224) to head (880ba91). :warning: Report is 1403 commits behind head on master.

Files with missing lines Patch % Lines
...roker/service/persistent/PersistentReplicator.java 85.71% 11 Missing and 13 partials :warning:
...ar/broker/service/persistent/ShadowReplicator.java 22.22% 6 Missing and 1 partial :warning:
...ache/pulsar/broker/service/AbstractReplicator.java 66.66% 2 Missing and 1 partial :warning:
.../java/org/apache/pulsar/client/impl/ClientCnx.java 25.00% 2 Missing and 1 partial :warning:
...er/service/persistent/GeoPersistentReplicator.java 84.61% 2 Missing :warning:
...sar/broker/service/persistent/PersistentTopic.java 66.66% 2 Missing :warning:
Additional details and impacted files

Impacted file tree graph

@@             Coverage Diff              @@
##             master   #24189      +/-   ##
============================================
+ Coverage     73.57%   74.26%   +0.68%     
- Complexity    32624    32752     +128     
============================================
  Files          1877     1868       -9     
  Lines        139502   145542    +6040     
  Branches      15299    16660    +1361     
============================================
+ Hits         102638   108081    +5443     
- Misses        28908    28924      +16     
- Partials       7956     8537     +581     
Flag Coverage Δ
inttests 26.69% <31.36%> (+2.10%) :arrow_up:
systests 23.31% <1.36%> (-1.02%) :arrow_down:
unittests 73.77% <81.36%> (+0.92%) :arrow_up:

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...a/org/apache/pulsar/broker/service/Replicator.java 0.00% <ø> (ø)
...service/nonpersistent/NonPersistentReplicator.java 67.00% <100.00%> (+3.63%) :arrow_up:
...roker/service/persistent/MessageDeduplication.java 76.83% <100.00%> (-4.08%) :arrow_down:
...pulsar/client/impl/GeoReplicationProducerImpl.java 49.13% <100.00%> (ø)
...va/org/apache/pulsar/client/impl/ProducerImpl.java 83.87% <100.00%> (+0.27%) :arrow_up:
...er/service/persistent/GeoPersistentReplicator.java 71.65% <84.61%> (-6.37%) :arrow_down:
...sar/broker/service/persistent/PersistentTopic.java 80.21% <66.66%> (+1.76%) :arrow_up:
...ache/pulsar/broker/service/AbstractReplicator.java 67.64% <66.66%> (-17.36%) :arrow_down:
.../java/org/apache/pulsar/client/impl/ClientCnx.java 69.65% <25.00%> (-2.12%) :arrow_down:
...ar/broker/service/persistent/ShadowReplicator.java 45.90% <22.22%> (-12.64%) :arrow_down:
... and 1 more

... and 1076 files with indirect coverage changes

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
  • :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.

codecov-commenter avatar May 21 '25 12:05 codecov-commenter

test failure:

 Error:  Tests run: 8, Failures: 1, Errors: 0, Skipped: 7, Time elapsed: 65.679 s <<< FAILURE! - in org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest
  Error:  org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest.testCreatePartitionedTopicWithNsReplication  Time elapsed: 20.673 s  <<< FAILURE!
  java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.client.api.Message.getValue()" because the return value of "org.apache.pulsar.client.api.Consumer.receive(int, java.util.concurrent.TimeUnit)" is null
  	at org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest.testCreatePartitionedTopicWithNsReplication(DisabledCreateTopicToRemoteClusterForReplicationTest.java:111)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)

could be a flaky test, attempt to fix was https://github.com/apache/pulsar/pull/24141

lhotari avatar May 30 '25 13:05 lhotari

there seems to be more flakiness in tests. I can see the first attempt failed in a replication test as well

  Error:  Tests run: 140, Failures: 1, Errors: 0, Skipped: 116, Time elapsed: 343.104 s <<< FAILURE! - in org.apache.pulsar.broker.service.ReplicatorTest
  Error:  org.apache.pulsar.broker.service.ReplicatorTest.testReplicationWillNotStuckByIncompleteSchemaFuture  Time elapsed: 16.339 s  <<< FAILURE!
  org.awaitility.core.ConditionTimeoutException: Assertion condition replication task finished expected [true] but found [false] within 10 seconds.
  	at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
  	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
  	at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
  	at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
  	at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
  	at org.apache.pulsar.broker.service.ReplicatorTest.waitReplicateFinish(ReplicatorTest.java:523)
  	at org.apache.pulsar.broker.service.ReplicatorTest.testReplicationWillNotStuckByIncompleteSchemaFuture(ReplicatorTest.java:516)
  	at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
  	at java.base/java.lang.reflect.Method.invoke(Method.java:580)
  	at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
  	at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
  	at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
  	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
  	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
  	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
  	at java.base/java.lang.Thread.run(Thread.java:1583)
  Caused by: java.lang.AssertionError: replication task finished expected [true] but found [false]
  	at org.testng.Assert.fail(Assert.java:110)
  	at org.testng.Assert.failNotEquals(Assert.java:1577)
  	at org.testng.Assert.assertTrue(Assert.java:56)
  	at org.apache.pulsar.broker.service.ReplicatorTest.lambda$waitReplicateFinish$5(ReplicatorTest.java:526)
  	at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
  	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
  	at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
  	... 4 more

lhotari avatar May 30 '25 13:05 lhotari

/pulsarbot rerun-failure-checks

lhotari avatar May 30 '25 13:05 lhotari

@lhotari

Please debug the flaky test failures in replication tests before we merge this change.

Fixed

poorbarcode avatar Jun 04 '25 01:06 poorbarcode

Dismissed @lhotari's request change because he did not reply for a long time

poorbarcode avatar Jul 02 '25 06:07 poorbarcode