asio
asio copied to clipboard
strand post probabilistic non-response
hi, I wrote a simple test case about strand, but there is an unexpected phenomenon. To put it simply, I constructed a thread pool with one io_context, and multiple strands. Then I tried to post multiple tasks to all strands, but probabilistically, some strands would not be responded. Let me show you the code.
#include <iostream>
#include <chrono>
#include <functional>
#include "asio.hpp"
// test case
int fibonacci(int n) {
if (n <= 0) {
return 0;
}
else if (n == 1) {
return 1;
}
else {
return fibonacci(n - 1) + fibonacci(n - 2);
}
}
// statistics
std::atomic<size_t> light_task_number_per_sec = 0; // number of light tasks executed per second
std::atomic<size_t> heavy_task_number_per_sec = 0; // number of heavy tasks executed per second
void statistics() {
std::cout << "statistics, now: " << std::chrono::duration_cast<std::chrono::milliseconds>(std::chrono::steady_clock::now().time_since_epoch()).count() <<
" light_task_number_per_sec: " << light_task_number_per_sec << " heavy_task_number_per_sec: " << heavy_task_number_per_sec << std::endl;
light_task_number_per_sec = 0;
heavy_task_number_per_sec = 0;
}
// config
size_t thread_number = 4; // number of threads
size_t total_strand_number = 100; // total number of strands
size_t heavy_strand_number = 2; // number of strands to perform heavy tasks. NOTICE: heavy_strand_number should never be greater than thread_number, prevent all threads from getting stuck
size_t tasks_per_strand = 10000; // The number of tasks posted per second per strand
int main() {
std::cout << "ready to test, thread_number: " << thread_number << " total_strand_number: " << total_strand_number << " heavy_strand_number: " << heavy_strand_number << " tasks_per_strand: " << tasks_per_strand << std::endl;
// make io and work
auto uniq_io = std::make_unique<asio::io_context>();
auto work = std::make_unique<asio::io_service::work>(*uniq_io);
// make thread pool
std::vector<std::unique_ptr<std::thread>> thread_pool;
for (size_t index = 0; index < thread_number; ++index) {
thread_pool.emplace_back(std::make_unique<std::thread>(std::bind([io = uniq_io.get()]() { io->run(); })));
}
// make strand pool
std::vector<std::unique_ptr<asio::io_context::strand>> strand_pool;
for (size_t index = 0; index < total_strand_number ; ++index) {
strand_pool.emplace_back(std::make_unique<asio::io_context::strand>(*uniq_io));
}
// infinite loop for testing(post tasks to each strand)
while (true) {
for (size_t strand_index = 0; strand_index < total_strand_number ; ++strand_index) {
for (size_t task_index = 0; task_index < tasks_per_strand ; ++task_index) {
strand_pool[strand_index]->post([strand_index]() {
// some strands perform light tasks, and some strands perform heavy tasks
if (strand_index >= heavy_strand_number) {
fibonacci(10); // light task
light_task_number_per_sec++;
}
else {
fibonacci(30); // heavy task
heavy_task_number_per_sec++;
}
});
}
}
// sleep and statistics
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
statistics();
}
return 0;
}
As shown in the code above, I created 4 threads, 100 strands, of which 98 strands perform light tasks and 2 strands perform heavy tasks. When heavy tasks accumulate, probabilistically, some of the 98 strands will not respond.
Let me show my test results.
In theory, since only 2 strands will perform heavy tasks, so that at most 2 threads will be busy, and the remaining 2 threads will handle the light tasks posted by the remaining 98 strands. Therefore, the number of light_task_number_per_sec should be 980000 (98 * 10000), but the result of the first test is only 960000, which is not expected (tasks posted by 2 strands are not executed), the results of the second test were as expected.
After an in-depth investigation, it was found that
The allocation algorithm of strand adopts the hash by default, this will cause part of the strand to actually be merged,
but the user is unaware of this.
For example in my test case, I tried creating 100 strands, but in fact there are only more than 70 truly independent strands, the rest of the strands are actually reused.
If a strand that posts light tasks is unlucky enough to be merged into a strand that posts heavy tasks, unexpected results can occur.
In actual situation, users tend to take the initiative to hand a variety of unrelated logic to different strands for processing, to ensure that tasks can be processed in parallel, and hope to achieve the purpose of maximizing the use of thread pool in this way. However, under the default strand allocation, even if the number of strands does not exceed the upper limit, it will still be merged silently, which is unlikely to be accepted. Because this is equivalent to turning multiple independent parallel tasks into serial tasks, this will have a large negative impact on the utilization of the thread pool, at the same time, unforeseen problems may arise.
I noticed that in version 1.6.1, added support for a new ASIO_ENABLE_SEQUENTIAL_STRAND_ALLOCATION flag which switches the allocation of strand implementations to use a round-robin approach rather than hashing. I think it's a good way, this ensures that asio's utilization of the thread pool is in line with user expectations before the upper limit of the strand is reached. I hope it can become a default configuration. On the other hand, if the user could be informed in some way when the strand exceeds the upper limit, this allows users to clearly understand the situation and reduce unspoken rules.
By the way, test environment is:
- asio version: 1.18.1
- Debian GNU/Linux 10 (buster) Debian 4.14.81.bm.15
- CPU Model name: Intel(R) Xeon(R) Platinum 8260 CPU @ 2.40GHz
- CPU(s): 8
- compile: g++ -I ./include/ asio_test.cpp -pthread -std=c++17 -O2 -o asio_test.out