Assertion error in multi-threaded programs
Hi, I'm trying to use promise-cpp in multi-threaded programs, but sometimes the following assertion error occurs in promise-cpp.
promise_inl.hpp(161)
std::list<std::shared_ptr<Task>> &pendingTasks = promiseHolder->pendingTasks_;
//promiseHolder->dump();
assert(pendingTasks.front() == task); // Assertion failed: (pendingTasks.front() == task), function call, file promise_inl.hpp, line 161.
pendingTasks.pop_front();
//promiseHolder->dump();
The README document says promise-cpp is thread safe but It seems not thread safe in some situation.
Is this a bug? Or should I fix how I use promise-cpp? Thanks.
Step to reproduce:
Build and run this code snippet.
#include <iostream>
#include <memory>
#include <vector>
#include <thread>
#include <functional>
#include <mutex>
#include <atomic>
#include <deque>
#include <stdexcept>
#define PROMISE_HEADONLY
#include <promise-cpp/promise.hpp>
struct ThreadPool
{
ThreadPool() {}
~ThreadPool() {}
void startThreads(int num) {
stopThreads();
_shouldStop = false;
for(int i = 0; i < num; ++i) {
auto th = std::thread([this] { threadProcess(); });
_threads.push_back(std::move(th));
}
}
void stopThreads() {
_shouldStop = true;
std::for_each(_threads.begin(), _threads.end(),
[](auto &th) { th.join(); });
}
void addTask(std::function<void()> f) {
auto lock = std::unique_lock<std::mutex>(_mtx);
_tasks.push_back(f);
_cvEmpty.notify_all();
}
void threadProcess()
{
for( ; ; ) {
if(_shouldStop) { break; }
std::function<void()> f;
{
auto lock = std::unique_lock<std::mutex>(_mtx);
auto result = _cvEmpty.wait_for(lock, std::chrono::milliseconds(100), [&] {
return _tasks.size() > 0;
});
if(result == false) { continue; }
f = _tasks.front();
_tasks.pop_front();
}
f();
}
}
private:
std::mutex _mtx;
std::deque<std::function<void()>> _tasks;
std::vector<std::thread> _threads;
std::atomic<bool> _shouldStop = { false };
std::condition_variable _cvEmpty;
public:
static ThreadPool * getInstance() { return _instance.load(); }
static ThreadPool * setInstance(ThreadPool *inst) { return _instance.exchange(inst); }
private:
static inline std::atomic<ThreadPool *> _instance;
};
promise::Promise yield_async()
{
return promise::newPromise([](promise::Defer defer) {
ThreadPool::getInstance()->addTask([defer] {
defer.resolve();
});
});
}
int main()
{
std::cout << "Start." << std::endl;
ThreadPool pool;
ThreadPool::setInstance(&pool);
pool.startThreads(10);
auto finished = std::make_shared<bool>(false);
static auto func = [=] {
return yield_async().then([] {
return true;
});
};
{
auto count = std::make_shared<std::atomic<int>>(0);
promise::doWhile([=](promise::DeferLoop loop) {
if(*count >= 300) {
loop.doBreak(count->load());
return;
}
promise::newPromise([=](promise::Defer defer) {
func().then([=](bool b) {
bool x = b;
}).then([=] {
return yield_async();
}).then([=] {
defer.resolve();
});
}).then([=] {
*count += 1;
std::cout << "count: " << *count << std::endl;
yield_async().then([=] {
loop.doContinue();
});
});
}).then([=](int n) {
std::cout << "hello : " << n << std::endl;
*finished = true;
});
};
pool.stopThreads();
ThreadPool::setInstance(nullptr);
std::cout << "Finished." << std::endl;
return 0;
}
thank you, It seems like a bug, i'll check it soon!
hi, i send a patch here that may fix this issue.
btw, the testing code above may have dead-lock and we need set larger thread pool size other than 10. the situration may be -- thread A is waiting for the idle state of thread pool. thread B is waiting for thread A call resolve.
Hi, Thank you for checking out this issue.
The patch adds a condition variable cond_ and waits for cond_ to be notified, but no one sends any notification to cond_.
It seems that's why the deadlock occurs.
Thus the code which sends notification to cond_ should be added to promise_inl.hpp, I suppose.