rocketmq icon indicating copy to clipboard operation
rocketmq copied to clipboard

[ISSUE#4599] Optimize enableBackpressureForAsyncMode not relying on NettyRemoting

Open shengminw opened this issue 2 years ago • 8 comments

issue4599

What is the purpose of the change

In the current backpressure mechanism pull request#4553, the runner task is running directly relying on backpressure in NettyRemoting.

if (this.defaultMQProducer.isEnableBackpressureForAsyncMode()) {
    runnable.run();
} else {
    try {
        executor.submit(runnable);
    } catch (RejectedExecutionException e) {
        throw new MQClientException("executor rejected ", e);
    }
}

To make a more accurate flow control mechanism, I re-build a backpressure mechanism based on the numbers of asyncSending messages threads and size of the message memory.

Brief changelog

  1. Use Semaphore to monitor pressure of sending message. semaphoreAsyncNum to limit maximum numbers of on-going sending async messages. semaphoreAsyncSize limit maximum message size of on-going sending async messages.
  2. reserve using threadpool in backpressure mechanism.

shengminw avatar Jul 13 '22 07:07 shengminw

Coverage Status

Coverage increased (+0.2%) to 47.767% when pulling 04dc407046027bb54b7fd9971c32258dcb9f0c7c on shengminw:backpressure-2 into 3ee603a49b04c0b602ff8eb45b3ef4330293fba8 on apache:develop.

coveralls avatar Jul 13 '22 13:07 coveralls

The back pressure mechanism is a very useful function, but at present, when the traffic of rocketmq is close to the bottleneck, the traffic will drop sharply due to "buzy". I hope to improve this phenomenon. Can you provide the improvement brought by this PR? such as test results

guyinyou avatar Jul 14 '22 02:07 guyinyou

The back pressure mechanism is a very useful function, but at present, when the traffic of rocketmq is close to the bottleneck, the traffic will drop sharply due to "buzy". I hope to improve this phenomenon. Can you provide the improvement brought by this PR? such as test results

@guyinyou IMO, the current design is indeed close to bottleneck, but the processing of messages is always continuous, so it will tend to a steady state. The number of successfully written messages per second does not change much. Only the message sending rate is reduced. If there is no back pressure, there will still be lots of rejections. Due to limited testing equipment, I just ran a simple test on my laptop. I set the producer to 4 threads for continuous asynchronous message sending. At the same time, I set up a thread to monitor the number of async-sent messages, the number of successfully sent messages, the number of timeouts messages, and the number of rejected messages within this 1s.

  • When off enableBackpressureForAsyncMode: image

  • When on enableBackpressureForAsyncMode: image

Although the local test cannot completely simulate the heavy-traffic environment in the server, it can still be seen, that after turning on the backpressure, the async-sending rate is reduced, but the rate of successful message writing remains unchanged. Besides, a large number of rejections can be avoided.

shengminw avatar Jul 14 '22 07:07 shengminw

The effect is still very obvious, well done.

guyinyou avatar Jul 14 '22 09:07 guyinyou

When I benchmarked rocketmq before, there was a need for back pressure. My approach was to maintain a semaphore to control the concurrent number of asynchronous sending, which achieved a very ideal effect. You can try it.

 UniformRateLimiter rateLimiter = new UniformRateLimiter(1024);
 AtomicLong windowsCnt = new AtomicLong(0);
 while(true) {
    try {
        if(Producer.windowsCnt.incrementAndGet() >= windowsSize){
            while (Producer.windowsCnt.get() >= windowsSize) {
                Thread.yield();
            }
        }
        producer.send(new Message(xxxxxxx), new SendCallback() {
            @Override public void onSuccess(SendResult sendResult) {
                windowsCnt.decrementAndGet();
            }

            @Override public void onException(Throwable e) {
                windowsCnt.decrementAndGet();
                throw new RuntimeException(e);
            }
        });
    } catch (Exception e) {
        e.printStackTrace();
    }
 }

Of course, this implementation is very rough. In actual scenarios, this rateLimiter should be a dynamic value, which may be increased or decreased according to the number of failed messages sent recently, the delay in sending, etc. If you are interested, you can communicate and discuss together.

guyinyou avatar Jul 14 '22 09:07 guyinyou

@guyinyou It seems that your approach is similar to my implementation, both use the semaphore to control the concurrent number of asynchronous sendings. Your idea about dynamic rateLimiter is a great suggestion. Do you mean a dynamic threshold to control whether to block sending?

shengminw avatar Jul 14 '22 09:07 shengminw

@guyinyou It seems that your approach is similar to my implementation, both use the semaphore to control the concurrent number of asynchronous sendings. Your idea about dynamic rateLimiter is a great suggestion. Do you mean a dynamic threshold to control whether to block sending?

Yes, and it is enough to only need to send failures and send delays

guyinyou avatar Jul 14 '22 11:07 guyinyou

Codecov Report

Merging #4601 (ba61a63) into develop (4f0636a) will increase coverage by 0.02%. The diff coverage is 70.37%.

@@              Coverage Diff              @@
##             develop    #4601      +/-   ##
=============================================
+ Coverage      43.22%   43.25%   +0.02%     
- Complexity      7724     7738      +14     
=============================================
  Files            995      995              
  Lines          69067    69126      +59     
  Branches        9143     9159      +16     
=============================================
+ Hits           29852    29898      +46     
- Misses         35469    35475       +6     
- Partials        3746     3753       +7     
Impacted Files Coverage Δ
...mq/client/impl/producer/DefaultMQProducerImpl.java 45.29% <65.67%> (+0.94%) :arrow_up:
...he/rocketmq/client/producer/DefaultMQProducer.java 60.44% <92.85%> (+2.15%) :arrow_up:
...apache/rocketmq/broker/longpolling/PopRequest.java 31.03% <0.00%> (-13.80%) :arrow_down:
...ketmq/common/protocol/body/ConsumerConnection.java 95.83% <0.00%> (-4.17%) :arrow_down:
...ketmq/client/impl/consumer/PullMessageService.java 49.23% <0.00%> (-3.08%) :arrow_down:
...a/org/apache/rocketmq/filter/util/BloomFilter.java 60.43% <0.00%> (-2.20%) :arrow_down:
...mq/store/ha/autoswitch/AutoSwitchHAConnection.java 70.43% <0.00%> (-1.35%) :arrow_down:
.../apache/rocketmq/store/ha/DefaultHAConnection.java 65.86% <0.00%> (-1.21%) :arrow_down:
...rocketmq/broker/processor/PopMessageProcessor.java 37.63% <0.00%> (-0.54%) :arrow_down:
...nt/impl/consumer/ConsumeMessageOrderlyService.java 49.82% <0.00%> (-0.36%) :arrow_down:
... and 10 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov-commenter avatar Jul 18 '22 08:07 codecov-commenter