cyber::base::ThreadPool task_queue_ Usage issues
I have a problem using cyber:: base:: ThreadPool:
`template <typename F, typename... Args> auto ThreadPool::Enqueue(F&& f, Args&&... args) -> std::future<typename std::result_of<F(Args...)>::type> { using return_type = typename std::result_of<F(Args...)>::type;
auto task = std::make_shared<std::packaged_task<return_type()>>( std::bind(std::forward<F>(f), std::forward<Args>(args)...));
std::future<return_type> res = task->get_future();
// don't allow enqueueing after stopping the pool if (stop_) { return std::future<return_type>(); } task_queue_.Enqueue(task { (*task)(); }); return res; };`
If the above code is a task_ Queue.Enqueue execution failed, returned res=task ->get_ Future() is invalid. At this point, if I use future. get(), I will encounter an error:
libc++abi: terminating with uncaught exception of type std::__1::future_error: The associated promise has been destructed prior to the associated state becoming ready.
I analyze the code, task_ Queue.Enqueue execution failed, mainly due to the following code:
template <typename T> bool BoundedQueue<T>::Enqueue(T&& element) { uint64_t new_tail = 0; uint64_t old_commit = 0; uint64_t old_tail = tail_.load(std::memory_order_acquire); do { new_tail = old_tail + 1; if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) { return false; } } while (!tail_.compare_exchange_weak(old_tail, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed)); pool_[GetIndex(old_tail)] = std::move(element); do { old_commit = old_tail; } while (cyber_unlikely(!commit_.compare_exchange_weak( old_commit, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed))); wait_strategy_->NotifyOne(); return true; }
It should be due to the task queue being full, which resulted in the failure to join the queue. If it is a ThreadPool using task_ Queue, is the following code more appropriate:
template <typename T> bool BoundedQueue<T>::Enqueue(T&& element) { uint64_t new_tail = 0; uint64_t old_commit = 0; uint64_t old_tail = tail_.load(std::memory_order_acquire); do { new_tail = old_tail + 1; if (GetIndex(new_tail) == GetIndex(head_.load(std::memory_order_acquire))) { // return false; continue; } } while (!tail_.compare_exchange_weak(old_tail, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed)); pool_[GetIndex(old_tail)] = std::move(element); do { old_commit = old_tail; } while (cyber_unlikely(!commit_.compare_exchange_weak( old_commit, new_tail, std::memory_order_acq_rel, std::memory_order_relaxed))); wait_strategy_->NotifyOne(); return true; }
Or use unbounded queues in ThreadPool?