pika
pika copied to clipboard
feat:change thread sheduling method in ThreadPool class (ospp 2024)
The logic is based on function WriteThread::AwaitState
in rocksdb. link
Before:
- All workers and main thread which pushs task in queue both are waiting the same lock. It can cause very intense competition.
- When a worker has finished one task, it will try to get lock again for a new task through function
await
. It can make the worker sleep with high probability due to intense competition. And it can cost much time to sleep and wake up.
After:
- This is a standard producer-consumer model. So we can use lock-free list to deal with this problem about intense competition.
- When a worker wake up, it will try to get tasks. And when it find there is no tasks, it will try to loop for a while to wait for new tasks. Because with high throughput the time for waiting new tasks is very short, so this loop will NOT cause serious block. In order to reduce the block time, the loop has 3 level.
2.1. 1-level. Using spin-loop to wait.
2.2. 2-level. Using long-time-loop to wait. The worker maybe yield the cpu when some condition is reached. And using a data to store probability of entering 2-level loop.
2.3. 3-level. Using function
await
to wait for new tasks.
params
- the count of 1-level loop:
default:
200
. Too much number maybe cause high cpu load. Too few number maybe cause vain opration. - queue_slow_size_:
default:
std::min(worker_num, 100)
. When the number of tasks in queue exceeds it, the main thread which call functionSchedule
callstd::this_thread::yield()
. - max_queue_size_:
default:
max_queue_size
. When the number of tasks in queue exceeds it, the main thread which call functionSchedule
callstd::this_thread::yield()
till the number of tasks in queue is less than threshold. - max_yield_usec_:
default:
100
. The max time of loop in 2-level loop. - slow_yield_usec_:
default:
3
. If the time the functionstd::this_thread::yield()
spends exceeds the threshold, the data sorce may be updated. - kMaxSlowYieldsWhileSpinning:
default:
3
. If the times of reaching condition above(5), the data sorce will be updated. - sampling_base:
default:
256
. It represent the provability of enter 2-level loop is not lower than1/sampling_base
.
Summary by CodeRabbit
-
Refactor
- Major restructuring of the thread pool's internal architecture for improved task handling, worker management, and synchronization.
- Significant changes to the
ThreadPool
andWorker
classes for better performance and scalability.
-
New Features
- Introduced methods for task execution, delayed scheduling, and synchronization within the thread pool.
-
Improvements
- Enhanced logic for managing multiple links and initializing workers.
- Updated task scheduling mechanisms to support better performance and reliability.
- Increased thread and pool sizes for improved resource utilization and performance.
@CodiumAI-Agent /review
PR Review 🔍
⏱️ Estimated effort to review [1-5] |
4, because the PR involves significant changes to the threading and scheduling logic in a thread pool implementation, which is a complex area requiring careful review to ensure thread safety, performance, and correctness. The changes include the introduction of lock-free structures and a detailed scheduling mechanism which needs thorough testing and validation. |
🧪 Relevant tests |
No |
⚡ Possible issues |
Possible Bug: The use of |
Thread Safety Concern: The lock-free implementation using | |
🔒 Security concerns |
No |
Code feedback:
relevant file | src/net/src/thread_pool.cc |
suggestion |
Consider using a condition variable instead of busy-waiting with |
relevant line | std::this_thread::yield(); |
relevant file | src/net/src/thread_pool.cc |
suggestion |
Implement proper memory management for |
relevant line | auto node = new Node(func, arg); |
relevant file | src/net/src/thread_pool.cc |
suggestion |
To prevent potential race conditions with the lock-free list, ensure that all operations on |
relevant line | if (newest_node->compare_exchange_weak(nodes, node)) { |
relevant file | src/net/src/thread_pool.cc |
suggestion |
Add error handling for the scenario where |
relevant line | auto node = new Node(func, arg); |
@CodiumAI-Agent /improve
PR Code Suggestions ✨
Category | Suggestion | Score |
Possible bug |
Add exception handling for memory allocation to prevent leaksEnsure proper exception handling for dynamic memory allocation when creating new src/net/src/thread_pool.cc [112-113]
Suggestion importance[1-10]: 9Why: Adding exception handling for dynamic memory allocation is crucial to prevent memory leaks and ensure program stability. This is a significant improvement addressing a potential bug. | 9 |
Best practice |
Use smart pointers for automatic memory managementReplace the manual memory management with smart pointers to avoid manual deletion and src/net/src/thread_pool.cc [112-113]
Suggestion importance[1-10]: 8Why: Using smart pointers enhances code safety and readability by automating memory management, reducing the risk of memory leaks and manual deletion errors. This is a best practice for modern C++. | 8 |
Enhancement |
Use a thread-safe queue to simplify task schedulingConsider using a thread-safe queue or priority queue that encapsulates the synchronization src/net/src/thread_pool.cc [112-113]
Suggestion importance[1-10]: 8Why: Encapsulating synchronization logic within a thread-safe queue can simplify the code and reduce the potential for synchronization errors. This enhancement improves code maintainability and readability. | 8 |
Performance |
Replace busy waiting with a sleep mechanism to improve CPU efficiencyConsider using a more efficient locking mechanism or lock-free data structures for the src/net/src/thread_pool.cc [103-109]
Suggestion importance[1-10]: 7Why: The suggestion to replace busy waiting with a sleep mechanism can improve CPU efficiency and reduce thread starvation. However, it may introduce latency in task scheduling, so the impact on performance should be carefully evaluated. | 7 |
Walkthrough
The recent updates to the ThreadPool
class involve a substantial overhaul aimed at enhancing task management, worker dynamics, and synchronization. Key modifications include the removal of the Task
struct, significant changes to the Worker
class, and the introduction of new methods and member variables in the ThreadPool
class. These enhancements are designed to optimize task handling, improve resource allocation, and ensure more effective synchronization among threads.
Changes
Files | Change Summary |
---|---|
conf/pika.conf |
Enhanced configuration with new parameters for caching, thread management, and compaction strategies; increased thread and pool sizes. |
src/net/include/thread_pool.h |
Major restructuring of ThreadPool and Worker classes; removal of Task struct; new structs and method signatures added. |
src/net/src/thread_pool.cc |
Refactoring of WorkerMain , runInThread , and task scheduling methods to improve efficiency and control flow. |
Sequence Diagram(s)
sequenceDiagram
participant Client
participant ThreadPool
participant Worker
Client->>ThreadPool: Schedule(task)
ThreadPool->>Worker: Notify worker
Worker-->>ThreadPool: Fetch task
Worker->>Worker: Execute task
Worker-->>ThreadPool: Task complete
Poem
In the code where threads entwine,
A pool revamped, now so divine.
Workers buzz with tasks anew,
Synchronized in perfect queue.
Tasks handled with care and might,
ThreadPool’s future, oh so bright! 🐇✨
Thank you for using CodeRabbit. We offer it for free to the OSS community and would appreciate your support in helping us grow. If you find it useful, would you consider giving us a shout-out on your favorite social media?
Tips
Chat
There are 3 ways to chat with CodeRabbit:
- Review comments: Directly reply to a review comment made by CodeRabbit. Example:
-
I pushed a fix in commit <commit_id>.
-
Generate unit testing code for this file.
-
Open a follow-up GitHub issue for this discussion.
-
- Files and specific lines of code (under the "Files changed" tab): Tag
@coderabbitai
in a new review comment at the desired location with your query. Examples:-
@coderabbitai generate unit testing code for this file.
-
@coderabbitai modularize this function.
-
- PR comments: Tag
@coderabbitai
in a new PR comment to ask questions about the PR branch. For the best results, please provide a very specific query, as very limited context is provided in this mode. Examples:-
@coderabbitai generate interesting stats about this repository and render them as a table.
-
@coderabbitai show all the console.log statements in this repository.
-
@coderabbitai read src/utils.ts and generate unit testing code.
-
@coderabbitai read the files in the src/scheduler package and generate a class diagram using mermaid and a README in the markdown format.
-
@coderabbitai help me debug CodeRabbit configuration file.
-
Note: Be mindful of the bot's finite context window. It's strongly recommended to break down tasks such as reading entire modules into smaller chunks. For a focused discussion, use review comments to chat about specific files and their changes, instead of using the PR comments.
CodeRabbit Commands (Invoked using PR comments)
-
@coderabbitai pause
to pause the reviews on a PR. -
@coderabbitai resume
to resume the paused reviews. -
@coderabbitai review
to trigger an incremental review. This is useful when automatic reviews are disabled for the repository. -
@coderabbitai full review
to do a full review from scratch and review all the files again. -
@coderabbitai summary
to regenerate the summary of the PR. -
@coderabbitai resolve
resolve all the CodeRabbit review comments. -
@coderabbitai configuration
to show the current CodeRabbit configuration for the repository. -
@coderabbitai help
to get help.
Other keywords and placeholders
- Add
@coderabbitai ignore
anywhere in the PR description to prevent this PR from being reviewed. - Add
@coderabbitai summary
to generate the high-level summary at a specific location in the PR description. - Add
@coderabbitai
anywhere in the PR title to generate the title automatically.
CodeRabbit Configuration File (.coderabbit.yaml
)
- You can programmatically configure CodeRabbit by adding a
.coderabbit.yaml
file to the root of your repository. - Please see the configuration documentation for more information.
- If your editor has YAML language server enabled, you can add the path at the top of this file to enable auto-completion and validation:
# yaml-language-server: $schema=https://coderabbit.ai/integrations/schema.v2.json
Documentation and Community
- Visit our Documentation for detailed information on how to use CodeRabbit.
- Join our Discord Community to get help, request features, and share feedback.
- Follow us on X/Twitter for updates and announcements.
这些都是一个 worker 一个无锁链表的测试(具体上面讨论有描述),代码:https://github.com/QlQlqiqi/pika/tree/change-thread-shedule-with-mutil-list
关闭 binlog、cache
redis-benchmark -h localhost -p 9221 -t get,set -n 5000000 -r 10000000000000 -d 512 -c 300 --threads 150
修改参数
kMaxSlowYieldsWhileSpinning、slow_yield_usec_、max_yield_usec_、指令类型
6 6 500 get
Summary:
throughput summary: 121294.45 requests per second
latency summary (msec):
avg min p50 p95 p99 max
2.452 0.016 1.911 5.975 11.775 137.087
6 6 500 set
Summary:
throughput summary: 22597.34 requests per second
latency summary (msec):
avg min p50 p95 p99 max
13.178 0.080 6.711 30.463 54.175 2711.551
3 3 100 get
Summary:
throughput summary: 122249.38 requests per second
latency summary (msec):
avg min p50 p95 p99 max
2.424 0.016 1.743 6.375 14.431 151.551
3 3 100 set
Summary:
throughput summary: 20026.92 requests per second
latency summary (msec):
avg min p50 p95 p99 max
14.806 0.064 5.903 29.727 52.831 3000.319
通过调参,可以降低一部分 get 的 p99 和 set 的 qps,但是 get 的 qps 和 set 的 p99 会提升。这块参数是和机器性能有关的,我本地测试机器性能一般,通过提高二级等待时间,可以适度提高效果。
或者可以通过牺牲 set 来换取 get 性能
如果恒让 yield_credit 为正数(即:总是进入二级等待),那么 get 的 qps 会提高,p99 会降低,对比如下:
Summary:
throughput summary: 122249.38 requests per second
latency summary (msec):
avg min p50 p95 p99 max
2.424 0.016 1.743 6.375 14.431 151.551
Summary:
throughput summary: 131887.84 requests per second
latency summary (msec):
avg min p50 p95 p99 max
2.256 0.024 1.879 5.055 8.855 159.615
因为总是进入二级等待,几乎可以让 get 测试不会触发 cv.wait,耗时减少;但是,这样会让本身就较为耗时的 set 总是在等待任务,占据了那些处理数据本身的时间,并且因为 set 执行慢,二级等待后可能依旧触发 cv.wait,从而导致慢。
当前分支成为 v1,一个 worker 一个无锁链表成为 v2。
v1 的 qps 低于 v2,但是 p99 也低于 v2,原因为:通过 gperftools 查看,v2 中有很大一部分耗时是 AsmVolatilePause,所以这就说明,v2 多个无锁链表,会导致等待效率降低,从而提高了 p99,但是一个链表又竞争过大,qps 也会低,所以需要取一个中间值,如:30 个 worker 有 5 个无锁链表,这样应该会在不降低 qps 的同时降低 p99。
v1 是所有的 worker 对应一个链表,v2 是每一个 worker 对应一个链表 这里是 worker num 为 2 的 v1 和 v2 的 get 测试:
v1:
Summary:
throughput summary: 82895.37 requests per second
latency summary (msec):
avg min p50 p95 p99 max
3.580 0.024 2.815 9.487 17.535 309.503
v2:
Summary:
throughput summary: 91702.74 requests per second
latency summary (msec):
avg min p50 p95 p99 max
3.245 0.016 2.887 7.719 9.167 203.263
就像上一个讨论说的,多个无锁链表会减弱三级等待成功率,所以,可以尝试少量 worker 对应一个无锁链表:
get:一共 30 个 worker,每 2 个 worker 对应一个链表
Summary:
throughput summary: 133697.00 requests per second
latency summary (msec):
avg min p50 p95 p99 max
2.213 0.016 1.615 6.279 10.711 173.951
get:一共 30 个 worker,每 3 个 worker 对应一个链表
Summary:
throughput summary: 86218.79 requests per second
latency summary (msec):
avg min p50 p95 p99 max
3.450 0.024 2.071 11.311 16.927 152.191
可以看到,在我的机器上 2 个无锁链表的竞争会达到最大。 https://github.com/QlQlqiqi/pika/tree/change-thread-shedule-with-mutil-list-per-worker
测试总结:
关闭 binlog、cache,30 个 worker 测试命令:
redis-benchmark -h localhost -p 9221 -t get,set -n 5000000 -r 10000000000000 -d 512 -c 300 --threads 150
- 所有的 worker 对应一个链表:这样竞争过大,CPU 更多的会去执行
Schedule
中的std::this_thread::yield();
,因为 worker 中待处理任务过多,需要让出线程,这样 CPU 切换会增大耗时。并且因为 CPU 去处理线程切换,其他已经从链表取出来的任务就得不到执行,从而减少等待效率,白白消耗第一级的等待(AsmVolatilePause()
)时间。 -
nworkers_per_link_
个 worker对应一个链表:根据机器性能和场景决定参数值,过大会因为竞争压力过大、导致等待效率降低,过小会因为竞争压力过小、链表中取数据概率降低、从而导致等待效率降低。在我的机器上(AMD7900x,ubuntu2204.3,Debug版本的 pika,DDR5-6400 内存条的环境下,这个值为2
是最好的)
至于程序中其他参数,如:kMaxSlowYieldsWhileSpinning
、slow_yield_usec_
、max_yield_usec_
等,更改参数也可以改变等待逻辑的时长,从而根据机器性能和场景选择合适的等待效率。
Bot detected the issue body's language is not English, translate it automatically.
Test summary:
Close binlog, cache, 30 workers Test command:
redis-benchmark -h localhost -p 9221 -t get,set -n 5000000 -r 10000000000000 -d 512 -c 300 --threads 150
- All workers correspond to a linked list: If the competition is too large, the CPU will execute more
std::this_thread::yield();
inSchedule
because there are too many tasks to be processed in the workers and need to be processed Threads are removed, so CPU switching will increase time consumption. And because the CPU handles thread switching, other tasks that have been taken out from the linked list will not be executed, thereby reducing the waiting efficiency and consuming the first-level waiting (AsmVolatilePause()
) time in vain. -
nworkers_per_link_
workers correspond to a linked list: the parameter value is determined according to the machine performance and scenario. If it is too large, the waiting efficiency will be reduced due to excessive competition pressure. If it is too small, the probability of fetching data from the linked list will be reduced due to too small competition pressure, thus reducing the probability of fetching data from the linked list. This results in reduced waiting efficiency. On my machine (AMD7900x, ubuntu2204.3, Debug version of pika, DDR5-6400 memory module, this value is2
is the best)
As for other parameters in the program, such as: kMaxSlowYieldsWhileSpinning
, slow_yield_usec_
, max_yield_usec_
, etc., changing the parameters can also change the length of the waiting logic, thereby selecting the appropriate waiting efficiency according to the machine performance and scenario.
timer task
可能 blocking worker thread,因为它总是 wait for 一定时间,如果在此期间有新的 task,该 worker thread 依旧 wait。所以,在 wait 的时候判断是否有新的 task 到来,如果有,则将已经从链表中取出来的 timer tasks 重新 push 回链表中,并执行 task 逻辑,以保证 worker 线程不会被阻塞。(目前这里没有设置多少 timer tasks 以内才会被 re-push 回链表)
lock.lock();
// if task is coming now, do task immediately
auto res = rsignal.wait_for(lock, std::chrono::microseconds(exec_time - unow), [this, &newest_node]() {
return newest_node.load(std::memory_order_relaxed) != nullptr || UNLIKELY(should_stop());
});
lock.unlock();
if (res) {
// re-push the timer tasks
ReDelaySchedule(time_first);
goto retry;
}
在我这测试结果如下,命令为:redis-benchmark -h localhost -p 9221 -t get,set -n 10000000 -r 10000000000000 -d 512 -c 32 --threads 16
关闭 binlog 和 cache,worker num 为 30 queue_slow_size_为:(std::max(10UL, std::min(worker_num * max_queue_size / 100, max_queue_size))),
unstable branch, nworkers_per_link_ = 1:
====== SET ======
Summary:
throughput summary: 31475.25 requests per second
latency summary (msec):
avg min p50 p95 p99 max
1.008 0.080 0.895 1.815 2.991 42.175
====== GET ======
Summary:
throughput summary: 70437.91 requests per second
latency summary (msec):
avg min p50 p95 p99 max
0.443 0.040 0.359 1.039 1.703 21.743
原版 pika branch:
====== SET ======
Summary:
throughput summary: 29344.70 requests per second
latency summary (msec):
avg min p50 p95 p99 max
1.082 0.072 0.999 1.887 2.583 38.271
====== GET ======
Summary:
throughput summary: 34520.49 requests per second
latency summary (msec):
avg min p50 p95 p99 max
0.918 0.040 0.711 2.575 3.831 72.255
结论
- set 是 QPS 和 P99 均有优化
- get 是 QPS 有些许优化,但是 P99 反向优化。
OS: ubuntu 2204.3 CPU: 7900X memory: 16G * 2 使用的是如下 conf
# 360 dba pika conf pika3.5.2
port : 9221
thread-num : 8
log-path : ./log/
loglevel : info
db-path : ./db/
write-buffer-size : 256M
timeout : 30
#requirepass : 06154eee364854d5
#masterauth : 06154eee364854d5
#userpass : 06154eee364854d5360
#userblacklist : bgsave,dumpoff,client
dump-prefix : pika-
dump-expire : 1
pidfile : .pika.pid
daemonize : yes
dump-path : ./dump/block_cache_size
maxclients : 20000
target-file-size-base : 20971520
expire-logs-days : 7
expire-logs-nums : 300
root-connection-num : 10
slowlog-log-slower-than : 100000
binlog-file-size : 104857600
compression : snappy
db-sync-path : ./dbsync
db-sync-speed : 60
slowlog-write-errorlog : yes
small-compaction-threshold : 5000
max-write-buffer-size : 20737418240
max-cache-files : 8000
replication-num : 0
consensus-level : 0
max-cache-statistic-keys : 0
thread-pool-size : 50
slowlog-write-errorlog : yes
default-slot-num : 1024
instance-mode : classic
databases : 1
sync-thread-num : 1
arena-block-size : 33554432
max-background-jobs : 12
max-background-flushes : 3
max-background-compactions : 9
rate-limiter-bandwidth : 1099511627776
db-instance-num : 1
block-size : 4096
#block-cache : 5368709120
block-cache : 4294967296
max-subcompactions : 8
#cache-maxmemory : 5368709120
cache-lfu-decay-time : 1
cache-maxmemory-samples : 5
cache-maxmemory-policy : 1
cache-num : 8
cache-model : 0
zset-cache-field-num-per-key : 512
zset-cache-start-direction : 0
cache-type :
share-block-cache : yes
throttle-bytes-per-second : 102400000
max-rsync-parallel-num : 4
write-binlog : no
slotmigrate : no
# Pika automatic compact compact strategy, a complement to rocksdb compact.
# Trigger the compact background task periodically according to `compact-interval`
# Can choose `full-compact` or `obd-compact`.
# obd-compact https://github.com/OpenAtomFoundation/pika/issues/2255
compaction-strategy : obd-compact
# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
compact-every-num-of-files : 50
# For OBD_Compact
# In another search, if the file creation time is
# greater than `force-compact-file-age-seconds`,
# a compaction of the upper and lower boundaries
# of the file will be performed at the same time
# `compact-every-num-of-files` -1
force-compact-file-age-seconds : 300
# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
force-compact-min-delete-ratio : 10
# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
dont-compact-sst-created-in-seconds : 20
# For OBD_Compact
# According to the number of sst files in rocksdb,
# compact every `compact-every-num-of-files` file.
best-delete-min-ratio : 40
测试命令和结果如下
redis-benchmark -h localhost -p 9221 -t set,get -n 10000000 -r 10000000000000 -d 512 -c 64 --threads 32
当前分支
====== SET ======
Summary:
throughput summary: 41356.49 requests per second
latency summary (msec):
avg min p50 p95 p99 max
1.536 0.088 1.423 2.639 3.655 47.583
====== GET ======
Summary:
throughput summary: 77099.22 requests per second
latency summary (msec):
avg min p50 p95 p99 max
0.819 0.040 0.751 1.519 2.359 39.071
pika unstable 分支
====== SET ======
Summary:
throughput summary: 35311.73 requests per second
latency summary (msec):
avg min p50 p95 p99 max
1.799 0.064 1.671 3.191 4.223 41.215
====== GET ======
Summary:
throughput summary: 46570.11 requests per second
latency summary (msec):
avg min p50 p95 p99 max
1.359 0.032 1.199 2.975 4.007 38.079
@cheniujh review
该 pr 的 p99 问题结合 #2816 解决。
Bot detected the issue body's language is not English, translate it automatically.
This pr's p99 issue is resolved in conjunction with #2816.