rocketmq
rocketmq copied to clipboard
[ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting
What is the purpose of the change
In the current backpressure mechanism pull request#4553, the runner task is running directly relying on backpressure in NettyRemoting.
if (this.defaultMQProducer.isEnableBackpressureForAsyncMode()) {
runnable.run();
} else {
try {
executor.submit(runnable);
} catch (RejectedExecutionException e) {
throw new MQClientException("executor rejected ", e);
}
}
To make a more accurate flow control mechanism, I re-build a backpressure mechanism based on the numbers of asyncSending messages threads and size of the message memory.
Brief changelog
- Use Semaphore to monitor pressure of sending message. semaphoreAsyncNum to limit maximum numbers of on-going sending async messages. semaphoreAsyncSize limit maximum message size of on-going sending async messages.
- reserve using threadpool in backpressure mechanism.
Coverage increased (+0.2%) to 47.767% when pulling 04dc407046027bb54b7fd9971c32258dcb9f0c7c on shengminw:backpressure-2 into 3ee603a49b04c0b602ff8eb45b3ef4330293fba8 on apache:develop.
The back pressure mechanism is a very useful function, but at present, when the traffic of rocketmq is close to the bottleneck, the traffic will drop sharply due to "buzy". I hope to improve this phenomenon. Can you provide the improvement brought by this PR? such as test results
The back pressure mechanism is a very useful function, but at present, when the traffic of rocketmq is close to the bottleneck, the traffic will drop sharply due to "buzy". I hope to improve this phenomenon. Can you provide the improvement brought by this PR? such as test results
@guyinyou IMO, the current design is indeed close to bottleneck, but the processing of messages is always continuous, so it will tend to a steady state. The number of successfully written messages per second does not change much. Only the message sending rate is reduced. If there is no back pressure, there will still be lots of rejections. Due to limited testing equipment, I just ran a simple test on my laptop. I set the producer to 4 threads for continuous asynchronous message sending. At the same time, I set up a thread to monitor the number of async-sent messages, the number of successfully sent messages, the number of timeouts messages, and the number of rejected messages within this 1s.
-
When off enableBackpressureForAsyncMode:
-
When on enableBackpressureForAsyncMode:
Although the local test cannot completely simulate the heavy-traffic environment in the server, it can still be seen, that after turning on the backpressure, the async-sending rate is reduced, but the rate of successful message writing remains unchanged. Besides, a large number of rejections can be avoided.
The effect is still very obvious, well done.
When I benchmarked rocketmq before, there was a need for back pressure. My approach was to maintain a semaphore to control the concurrent number of asynchronous sending, which achieved a very ideal effect. You can try it.
UniformRateLimiter rateLimiter = new UniformRateLimiter(1024);
AtomicLong windowsCnt = new AtomicLong(0);
while(true) {
try {
if(Producer.windowsCnt.incrementAndGet() >= windowsSize){
while (Producer.windowsCnt.get() >= windowsSize) {
Thread.yield();
}
}
producer.send(new Message(xxxxxxx), new SendCallback() {
@Override public void onSuccess(SendResult sendResult) {
windowsCnt.decrementAndGet();
}
@Override public void onException(Throwable e) {
windowsCnt.decrementAndGet();
throw new RuntimeException(e);
}
});
} catch (Exception e) {
e.printStackTrace();
}
}
Of course, this implementation is very rough. In actual scenarios, this rateLimiter should be a dynamic value, which may be increased or decreased according to the number of failed messages sent recently, the delay in sending, etc. If you are interested, you can communicate and discuss together.
@guyinyou It seems that your approach is similar to my implementation, both use the semaphore to control the concurrent number of asynchronous sendings. Your idea about dynamic rateLimiter is a great suggestion. Do you mean a dynamic threshold to control whether to block sending?
@guyinyou It seems that your approach is similar to my implementation, both use the semaphore to control the concurrent number of asynchronous sendings. Your idea about dynamic rateLimiter is a great suggestion. Do you mean a dynamic threshold to control whether to block sending?
Yes, and it is enough to only need to send failures and send delays
Codecov Report
Merging #4601 (ba61a63) into develop (4f0636a) will increase coverage by
0.02%
. The diff coverage is70.37%
.
@@ Coverage Diff @@
## develop #4601 +/- ##
=============================================
+ Coverage 43.22% 43.25% +0.02%
- Complexity 7724 7738 +14
=============================================
Files 995 995
Lines 69067 69126 +59
Branches 9143 9159 +16
=============================================
+ Hits 29852 29898 +46
- Misses 35469 35475 +6
- Partials 3746 3753 +7
Impacted Files | Coverage Δ | |
---|---|---|
...mq/client/impl/producer/DefaultMQProducerImpl.java | 45.29% <65.67%> (+0.94%) |
:arrow_up: |
...he/rocketmq/client/producer/DefaultMQProducer.java | 60.44% <92.85%> (+2.15%) |
:arrow_up: |
...apache/rocketmq/broker/longpolling/PopRequest.java | 31.03% <0.00%> (-13.80%) |
:arrow_down: |
...ketmq/common/protocol/body/ConsumerConnection.java | 95.83% <0.00%> (-4.17%) |
:arrow_down: |
...ketmq/client/impl/consumer/PullMessageService.java | 49.23% <0.00%> (-3.08%) |
:arrow_down: |
...a/org/apache/rocketmq/filter/util/BloomFilter.java | 60.43% <0.00%> (-2.20%) |
:arrow_down: |
...mq/store/ha/autoswitch/AutoSwitchHAConnection.java | 70.43% <0.00%> (-1.35%) |
:arrow_down: |
.../apache/rocketmq/store/ha/DefaultHAConnection.java | 65.86% <0.00%> (-1.21%) |
:arrow_down: |
...rocketmq/broker/processor/PopMessageProcessor.java | 37.63% <0.00%> (-0.54%) |
:arrow_down: |
...nt/impl/consumer/ConsumeMessageOrderlyService.java | 49.82% <0.00%> (-0.36%) |
:arrow_down: |
... and 10 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more