starrocks icon indicating copy to clipboard operation
starrocks copied to clipboard

[Refactor][Enhancement] Refactor DataStreamRecvr::SenderQueue

Open silverbullet233 opened this issue 3 years ago • 3 comments
trafficstars

What type of PR is this:

  • [ ] bug
  • [ ] feature
  • [x] enhancement
  • [x] refactor
  • [ ] others

Which issues of this PR fixes :

Fixes #

Problem Summary(Required) :

The way SenderQueue is used in the two execute engines is very different. In the non-pipeline engine, It should block and wait if there is no new chunk arriving when getting chunk, std::mutex and std::condition_variable are introduced for this. But in the pipeline engine, the execution threads are shared and it is necessary to try to avoid blocking caused by lock competition.

The current implementation is not customized for the two scenarios and all the logic is mixed together, which is very unoptimized and has additional overhead.

In this PR, I try to refactor the implementation of SenderQueue, the main changes are as follows:

  1. Add SenderQueue as a base class, based on which PipelineSenderQueue and NonPipelineSenderQueue are implemented respectively;
  2. The original SenderQueue implementation is moved to NonPipelineSenderQueue, and member variables and related interfaces that will not be used in the non-pipeline engine are moved.
  3. The PipelineSenderQueue is specially implemented for the pipeline engine. In order to avoid blocking the pipeline execution thread, it will use lock-free structures as much as possible. For this reason, a lock-free queue is introduced from https://github.com/cameron314/concurrentqueue. From the public benchmark information(https://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++.htm#benchmarks and https://github.com/Qihoo360/evpp/blob/master/docs/benchmark_lockfree_vs_mutex_cn.md), it has good performance.

Optimizations in PipelineSenderQueue

  1. if _is_pipeline_level_shuffle=true, a separate lock-free queue will be created for each driver sequence to avoid competition;

  2. introduce the unplug mechanism to reduce the scheduling overhead of exchange source pipeline driver

  3. introduce a lazy deserialization mechanism, move the deserialization operation from the add_chunk stage to the get_chunk stage which can reduce the latency of transmit_chunk rpc and can also reduce some uncessary computing overhead when short_circuit happens.

silverbullet233 avatar May 31 '22 09:05 silverbullet233

As I know, concurrentqueue is not linearizable and not sequentially consistent, is this affect its usage in starrocks?

Pslydhh avatar Jun 02 '22 03:06 Pslydhh

As I know, concurrentqueue is not linearizable and not sequentially consistent, is this affect its usage in starrocks?

I don't think this will have any effect, in our scenario as long as it is guaranteed to be eventually consistent it should be fine

silverbullet233 avatar Aug 05 '22 09:08 silverbullet233

for the convenience of review, I split this PR into four:

  1. https://github.com/StarRocks/starrocks/pull/9711
  2. https://github.com/StarRocks/starrocks/pull/9719
  3. https://github.com/StarRocks/starrocks/pull/9740
  4. https://github.com/StarRocks/starrocks/pull/9741

silverbullet233 avatar Aug 09 '22 01:08 silverbullet233