pulsar
pulsar copied to clipboard
[fix][broker]excessive replication speed leads to error: Producer send queue is full
Motivation
Background
- Replication has a limitation that limits the maximum in-flight publishing message.
- Users can configure the quota by
replicationProducerQueueSize - Replication has a variable named
pendingMessagesthat records how many messages are pending to be published - When there is a task that fetches schemas, the replicator will pause and mark
fetchSchemaInProgressastrue - When the replicator needs to rewind the cursor, the replicator will pause and mark
waitForCursorRewindingastrue
- Users can configure the quota by
- Replication allows at most one
inflight cursor reading, which was limited byhavePendingRead. - Reolication allows multi
inflight publishing
Issue: The multiple mechanisms described above can not work well
| time/thread | read more entries A |
read more entries B |
|---|---|---|
| 1 | calculate permits: got 1000 |
calculate permits: got 1000 |
| 2 | set havePendingRead -> true |
|
| 3 | start reading | |
| 4 | read out 1000 msgs |
|
| 5 | publishing is messages, and decrease permits one by one | |
| 6 | set havePendingRead -> true |
|
| 7 | the 1000 msgs are still in-progress |
|
| 8 | start reading | |
| 9 | read out 1000 msgs |
|
| 10 | publishing is messages, and decrease permits one by one | |
| 11 | There are 2000 msgs in publishing, which is more than expected, get error Producer send queue is full |
|
| 12 | rewind cursor and trigger more read more entries, leads the situation bader |
pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://public/default/output-partition-0 | prod-->prod-repl] Error producing on re
mote broker
pulsar-broker org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
pulsar-broker at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator.replicateEntries(GeoPersistentReplicator.java:191) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:313) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:180) ~[io.streamnative-managed-ledger-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
pulsar-broker at java.base/java.lang.Thread.run(Unknown Source) [?:?]
pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] INFO org.apache.bookkeeper.mledger.impl.ManagedCursorImpl - [public/default/persistent/output-partition-0-pulsar.repl.prod-repl] Rewind from 2111423:1 to 2111423
:0
pulsar-broker 2025-03-27T22:02:44,247+0000 [BookKeeperClientWorker-OrderedExecutor-2-0] ERROR org.apache.pulsar.broker.service.persistent.PersistentReplicator - [persistent://public/default/output-partition-0 | prod-->prod-repl] Error producing on re
mote broker
pulsar-broker org.apache.pulsar.client.api.PulsarClientException$ProducerQueueIsFullError: Producer send queue is full
pulsar-broker at org.apache.pulsar.client.impl.ProducerImpl.canEnqueueRequest(ProducerImpl.java:1055) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.pulsar.client.impl.ProducerImpl.sendAsync(ProducerImpl.java:534) ~[io.streamnative-pulsar-client-original-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.pulsar.broker.service.persistent.GeoPersistentReplicator.replicateEntries(GeoPersistentReplicator.java:191) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.pulsar.broker.service.persistent.PersistentReplicator.readEntriesComplete(PersistentReplicator.java:313) ~[io.streamnative-pulsar-broker-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.bookkeeper.mledger.impl.OpReadEntry.lambda$checkReadCompletion$2(OpReadEntry.java:180) ~[io.streamnative-managed-ledger-3.3.5.jar:3.3.5]
pulsar-broker at org.apache.bookkeeper.common.util.SingleThreadExecutor.safeRunTask(SingleThreadExecutor.java:128) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker at org.apache.bookkeeper.common.util.SingleThreadExecutor.run(SingleThreadExecutor.java:99) ~[io.streamnative-bookkeeper-common-4.17.1.1.jar:4.17.1.1]
pulsar-broker at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.119.Final.jar:4.1.119.Final]
pulsar-broker at java.base/java.lang.Thread.run(Unknown Source) [?:?]
Modifications
- merge multiple mechanisms into one and fix the issue
Documentation
- [ ]
doc - [ ]
doc-required - [x]
doc-not-needed - [ ]
doc-complete
Matching PR in forked repository
PR in forked repository: x
@poorbarcode is there any relationship to PIP-269: Add an epoch of cursor to discard outdated reading or any other previous reported issues ?
@lhotari
@poorbarcode is there any relationship to https://github.com/apache/pulsar/pull/20469 or any other previous reported issues ?
It does not relate to
/pulsarbot rerun-failure-checks
Codecov Report
:x: Patch coverage is 81.36364% with 41 lines in your changes missing coverage. Please review.
:white_check_mark: Project coverage is 74.26%. Comparing base (bbc6224) to head (880ba91).
:warning: Report is 1403 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #24189 +/- ##
============================================
+ Coverage 73.57% 74.26% +0.68%
- Complexity 32624 32752 +128
============================================
Files 1877 1868 -9
Lines 139502 145542 +6040
Branches 15299 16660 +1361
============================================
+ Hits 102638 108081 +5443
- Misses 28908 28924 +16
- Partials 7956 8537 +581
| Flag | Coverage Δ | |
|---|---|---|
| inttests | 26.69% <31.36%> (+2.10%) |
:arrow_up: |
| systests | 23.31% <1.36%> (-1.02%) |
:arrow_down: |
| unittests | 73.77% <81.36%> (+0.92%) |
:arrow_up: |
Flags with carried forward coverage won't be shown. Click here to find out more.
| Files with missing lines | Coverage Δ | |
|---|---|---|
| ...a/org/apache/pulsar/broker/service/Replicator.java | 0.00% <ø> (ø) |
|
| ...service/nonpersistent/NonPersistentReplicator.java | 67.00% <100.00%> (+3.63%) |
:arrow_up: |
| ...roker/service/persistent/MessageDeduplication.java | 76.83% <100.00%> (-4.08%) |
:arrow_down: |
| ...pulsar/client/impl/GeoReplicationProducerImpl.java | 49.13% <100.00%> (ø) |
|
| ...va/org/apache/pulsar/client/impl/ProducerImpl.java | 83.87% <100.00%> (+0.27%) |
:arrow_up: |
| ...er/service/persistent/GeoPersistentReplicator.java | 71.65% <84.61%> (-6.37%) |
:arrow_down: |
| ...sar/broker/service/persistent/PersistentTopic.java | 80.21% <66.66%> (+1.76%) |
:arrow_up: |
| ...ache/pulsar/broker/service/AbstractReplicator.java | 67.64% <66.66%> (-17.36%) |
:arrow_down: |
| .../java/org/apache/pulsar/client/impl/ClientCnx.java | 69.65% <25.00%> (-2.12%) |
:arrow_down: |
| ...ar/broker/service/persistent/ShadowReplicator.java | 45.90% <22.22%> (-12.64%) |
:arrow_down: |
| ... and 1 more |
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
- :package: JS Bundle Analysis: Save yourself from yourself by tracking and limiting bundle sizes in JS merges.
test failure:
Error: Tests run: 8, Failures: 1, Errors: 0, Skipped: 7, Time elapsed: 65.679 s <<< FAILURE! - in org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest
Error: org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest.testCreatePartitionedTopicWithNsReplication Time elapsed: 20.673 s <<< FAILURE!
java.lang.NullPointerException: Cannot invoke "org.apache.pulsar.client.api.Message.getValue()" because the return value of "org.apache.pulsar.client.api.Consumer.receive(int, java.util.concurrent.TimeUnit)" is null
at org.apache.pulsar.broker.service.DisabledCreateTopicToRemoteClusterForReplicationTest.testCreatePartitionedTopicWithNsReplication(DisabledCreateTopicToRemoteClusterForReplicationTest.java:111)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
could be a flaky test, attempt to fix was https://github.com/apache/pulsar/pull/24141
there seems to be more flakiness in tests. I can see the first attempt failed in a replication test as well
Error: Tests run: 140, Failures: 1, Errors: 0, Skipped: 116, Time elapsed: 343.104 s <<< FAILURE! - in org.apache.pulsar.broker.service.ReplicatorTest
Error: org.apache.pulsar.broker.service.ReplicatorTest.testReplicationWillNotStuckByIncompleteSchemaFuture Time elapsed: 16.339 s <<< FAILURE!
org.awaitility.core.ConditionTimeoutException: Assertion condition replication task finished expected [true] but found [false] within 10 seconds.
at org.awaitility.core.ConditionAwaiter.await(ConditionAwaiter.java:167)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:119)
at org.awaitility.core.AssertionCondition.await(AssertionCondition.java:31)
at org.awaitility.core.ConditionFactory.until(ConditionFactory.java:985)
at org.awaitility.core.ConditionFactory.untilAsserted(ConditionFactory.java:769)
at org.apache.pulsar.broker.service.ReplicatorTest.waitReplicateFinish(ReplicatorTest.java:523)
at org.apache.pulsar.broker.service.ReplicatorTest.testReplicationWillNotStuckByIncompleteSchemaFuture(ReplicatorTest.java:516)
at java.base/jdk.internal.reflect.DirectMethodHandleAccessor.invoke(DirectMethodHandleAccessor.java:103)
at java.base/java.lang.reflect.Method.invoke(Method.java:580)
at org.testng.internal.invokers.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:139)
at org.testng.internal.invokers.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:47)
at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:76)
at org.testng.internal.invokers.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:317)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1144)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:642)
at java.base/java.lang.Thread.run(Thread.java:1583)
Caused by: java.lang.AssertionError: replication task finished expected [true] but found [false]
at org.testng.Assert.fail(Assert.java:110)
at org.testng.Assert.failNotEquals(Assert.java:1577)
at org.testng.Assert.assertTrue(Assert.java:56)
at org.apache.pulsar.broker.service.ReplicatorTest.lambda$waitReplicateFinish$5(ReplicatorTest.java:526)
at org.awaitility.core.AssertionCondition.lambda$new$0(AssertionCondition.java:53)
at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:248)
at org.awaitility.core.ConditionAwaiter$ConditionPoller.call(ConditionAwaiter.java:235)
... 4 more
/pulsarbot rerun-failure-checks
@lhotari
Please debug the flaky test failures in replication tests before we merge this change.
Fixed
Dismissed @lhotari's request change because he did not reply for a long time