Inconsistent counter values in multi-threaded vector access using thread_pool
Hello,
I am encountering an issue with inconsistent counter values in a multi-threaded environment using your thread pool implementation. The expected behavior is for the counter to reach a consistent value of 204 after each complete run of the sim() function, which performs multiple enqueues to the thread pool. However, the actual counter values vary.
#include <iostream>
#include <vector>
#include "thread_pool.h"
using namespace std;
dp::thread_pool pool(24);
class Vect
{
public:
std::vector<std::vector<double>> data;
int id = -1;
Vect(int id) : data(30, std::vector<double>()) { this->id = id; }
std::mutex g_i_mutexx;
int counter = 0;
void addDataVec(int pos, double value)
{
std::lock_guard<std::mutex> lock(g_i_mutexx);
data[pos].push_back(value);
counter++;
}
std::vector<std::vector<double>> returnVectorSim()
{
return (data);
}
void clearVector(int num_vec)
{
data[num_vec].clear();
}
};
void sim()
{
Vect vect(1);
for (size_t var1 = 0; var1 < 17; var1++)
{
for (int var2 = 0; var2 < 12; var2++)
{
pool.enqueue([&vect, var1, var2]() {
vect.addDataVec(var1, var2);
});
}
}
pool.wait_for_tasks();
std::cout << vect.counter << std::endl;
}
int main()
{
for (size_t var = 0; var < 17; var++)
{
sim();
}
return 0;
}
Observed Counter Values:
- The values observed were not consistent and varied across different runs (e.g., 204, 203, 202, 201).
Expected Behavior:
- The counter should consistently reach 204 after each execution of sim().
Could someone help me understand what might be causing these inconsistencies?
Thank you!
Hello!
Thanks for reporting the issue and for the nice discussion on discord. After some testing I'm able to reproduce the issue with this code:
TEST_CASE("Ensure wait_for_tasks() properly waits for tasks to fully complete") {
class counter_wrapper {
public:
counter_wrapper() = default;
std::atomic_int counter = 0;
void increment_counter() { counter.fetch_add(1, std::memory_order_release); }
};
dp::thread_pool local_pool{};
constexpr auto task_count = 10;
std::vector<int> counts(task_count);
for (size_t i = 0; i < task_count; i++) {
counter_wrapper cnt_wrp{};
for (size_t var1 = 0; var1 < 17; var1++) {
for (int var2 = 0; var2 < 12; var2++) {
local_pool.enqueue_detach([&cnt_wrp]() { cnt_wrp.increment_counter(); });
}
}
local_pool.wait_for_tasks();
// std::cout << cnt_wrp.counter << std::endl;
counts[i] = cnt_wrp.counter.load(std::memory_order_acquire);
}
auto all_correct_count =
std::ranges::all_of(counts, [](int count) { return count == 17 * 12; });
const auto sum = std::accumulate(counts.begin(), counts.end(), 0);
CHECK_EQ(sum, 17 * 12 * task_count);
CHECK(all_correct_count);
}
You can see the current state of the code here: https://github.com/DeveloperPaul123/thread-pool/tree/fix/wait-for-tasks
I've tried some fixes and moved to using std::barrier for wait_for_tasks() but it seems there are still some work that is being dropped. It's unclear to me what the cause is here, but I suspect that there is something wrong with the atomics that are used to check the state of the pool.
This should be fixed now in #68