starrocks
starrocks copied to clipboard
[Refactor][Enhancement] Refactor DataStreamRecvr::SenderQueue
What type of PR is this:
- [ ] bug
- [ ] feature
- [x] enhancement
- [x] refactor
- [ ] others
Which issues of this PR fixes :
Fixes #
Problem Summary(Required) :
The way SenderQueue is used in the two execute engines is very different. In the non-pipeline engine, It should block and wait if there is no new chunk arriving when getting chunk, std::mutex and std::condition_variable are introduced for this. But in the pipeline engine, the execution threads are shared and it is necessary to try to avoid blocking caused by lock competition.
The current implementation is not customized for the two scenarios and all the logic is mixed together, which is very unoptimized and has additional overhead.
In this PR, I try to refactor the implementation of SenderQueue, the main changes are as follows:
- Add
SenderQueueas a base class, based on whichPipelineSenderQueueandNonPipelineSenderQueueare implemented respectively; - The original
SenderQueueimplementation is moved toNonPipelineSenderQueue, and member variables and related interfaces that will not be used in the non-pipeline engine are moved. - The
PipelineSenderQueueis specially implemented for the pipeline engine. In order to avoid blocking the pipeline execution thread, it will use lock-free structures as much as possible. For this reason, a lock-free queue is introduced from https://github.com/cameron314/concurrentqueue. From the public benchmark information(https://moodycamel.com/blog/2014/a-fast-general-purpose-lock-free-queue-for-c++.htm#benchmarks and https://github.com/Qihoo360/evpp/blob/master/docs/benchmark_lockfree_vs_mutex_cn.md), it has good performance.
Optimizations in PipelineSenderQueue
-
if _is_pipeline_level_shuffle=true, a separate lock-free queue will be created for each driver sequence to avoid competition;
-
introduce the unplug mechanism to reduce the scheduling overhead of exchange source pipeline driver
-
introduce a lazy deserialization mechanism, move the deserialization operation from the add_chunk stage to the get_chunk stage which can reduce the latency of transmit_chunk rpc and can also reduce some uncessary computing overhead when short_circuit happens.
As I know, concurrentqueue is not linearizable and not sequentially consistent, is this affect its usage in starrocks?
As I know, concurrentqueue is not linearizable and not sequentially consistent, is this affect its usage in starrocks?
I don't think this will have any effect, in our scenario as long as it is guaranteed to be eventually consistent it should be fine
for the convenience of review, I split this PR into four:
- https://github.com/StarRocks/starrocks/pull/9711
- https://github.com/StarRocks/starrocks/pull/9719
- https://github.com/StarRocks/starrocks/pull/9740
- https://github.com/StarRocks/starrocks/pull/9741