oneTBB
oneTBB copied to clipboard
tbb::task_group does not work well with serial task queues
In our application we protect non-thread safe shared resources via serial task queues. When part of the application needs to use the shared resource it pushes a task into the appropriate serial task queue. The serial task queue makes sure that 1 and only 1 task it holds runs at a time. These tasks are usually set to run asynchronously, although a few places require them to happen synchronously. This all worked fine with the old tbb::task
interface as the synchronous calls could just do the wait on the task which was added to the queue. Problems arise when using tbb::task_group
.
The application needs to be able to wait for tasks to finish at several different spots. There is a wait that happens on the main thread to wait for all work of the application to finish. There are also waits that happen synchronously from within a running task. When all we have are tbb::task_groups
it means each wait has to use a different tbb::task_group
. This is a problem for the synchronous waits. So if a tbb::task_group
has tasks presently running on other threads and there is no more tasks for the thread doing the wait to run, then tasks from other tbb::task_groups
can be run on the waiting threads. However, if there are no running tasks from the tbb::task_group
then that tbb::task_group
will immediately stop waiting. If one has a queue of tasks and the tasks earlier in the queue are from a different tbb::task_group
and the tasks from the synchronous waiting tbb::task_group
are later in the queue then there is no guarantee that the earlier tasks will ever be run (as the waiting tbb::task_group
will not run them as its tasks have yet to be started).
This is illustrated in the code added as an additional comment to this issue. The code issues two tasks on the 'outer' (i.e. main) tbb::task_group
and then attempts to do a synchronous wait for an 'inner' task to complete. If the program runs with >1 thread, the program works fine. If run with just 1 thread none of the tasks will be run.
If we use the resumable task API preview feature: https://www.threadingbuildingblocks.org/docs/help/index.htm#reference/appendices/preview_features/resumable_tasks.html the it is possible to safely do a synchronous wait.
Given the ability to use serial task queues with synchronous waits is a 'deal breaker' for use (we've been using TBB for 5 years now) we would like to know
- is there a way to safely use
tbb::task_groups
in this way, or - will the resumable task API (or something similar) be added to the production release of TBB in the near (say 3 month) future?
Here is the example code
#if defined(USE_RESUMABLE_TASKS)
#define TBB_PREVIEW_RESUMABLE_TASKS 1
#include <tbb/task.h>
#endif
#include <tbb/task_group.h>
#include <tbb/global_control.h>
#include <iostream>
#include <atomic>
#include <tbb/concurrent_queue.h>
#include <functional>
//Serializes task calls
class TaskQueue {
private:
class QueuedTask;
public:
template<typename F>
void push(tbb::task_group& iGroup, F&& iF) {
queue_.push(new QueuedTask(iGroup, std::function<void()>(iF), this) );
bool expected = false;
if(started_.compare_exchange_strong(expected, true) ) {
QueuedTask* next = nullptr;
queue_.try_pop(next);
next->runAsync();
}
}
private:
friend class QueuedTask;
QueuedTask* pop() {
QueuedTask* next = nullptr;
queue_.try_pop(next);
if(nullptr == next) { started_ = false; }
return next;
}
//When a task finishes it automatically starts
// the next task in the queue
class QueuedTask {
public:
QueuedTask(tbb::task_group& iGroup, std::function<void()> iFunc, TaskQueue* iQueue):
group_(&iGroup), f_(std::move(iFunc)), queue_(iQueue) {}
void runAsync() {
group_->run([this]() {
f_();
auto next = queue_->pop();
if(next) { next->runAsync(); }
delete this;
});
}
private:
tbb::task_group* group_;
std::function<void()> f_;
TaskQueue* queue_;
};
tbb::concurrent_queue<QueuedTask*> queue_;
std::atomic<bool> started_{false};
};
int main(int argc, char const* const* argv ) {
tbb::global_control gc(tbb::global_control::max_allowed_parallelism, std::stoi(argv[1]));
tbb::task_group outer;
TaskQueue taskQueue;
//note: without this outer.run the tbb::task::suspend seg faults
outer.run([&outer,&taskQueue]() {
taskQueue.push(outer,[]() { std::cout<<"first"<<std::endl;});
taskQueue.push(outer,[]() { std::cout<<"second"<<std::endl;});
std::cout <<" inner wait started"<<std::endl;
#if defined(USE_RESUMABLE_TASKS)
tbb::task::suspend([&outer,&taskQueue](tbb::task::suspend_point tag) {
taskQueue.push(outer,[&tag]() {
std::cout <<"inner"<<std::endl;
tbb::task::resume(tag);
});
});
#else
{
tbb::task_group inner;
std::atomic<bool> wasRun{false};
taskQueue.push(inner, [&wasRun]() {std::cout <<"inner"<<std::endl; wasRun = true;});
do {
inner.wait();
}while(not wasRun);
}
#endif
std::cout <<" inner wait finished"<<std::endl;
taskQueue.push(outer,[]() { std::cout<<"last"<<std::endl;});
});
outer.wait();
}
@Dr15Jones, task_group::defer
is the solution to this problem. QueuedTask
should create a deferred task (task_handle
) via call to task_group::defer
, thus blocking the task_group::wait
until the task, stored in the task_handle
, is actually run. :
class QueuedTask {
public:
QueuedTask(tbb::task_group& iGroup, std::function<void()> iFunc, TaskQueue* iQueue):
group_(&iGroup),
th_(iGroup.defer([this, iFunc, iQueue]{
iFunc();
auto next = iQueue->pop();
if(next) { next->runAsync(); }
delete this;
}))
{}
void runAsync() {
group_->run(std::move(th_));
}
private:
tbb::task_group* group_;
tbb::task_handle th_;
};
here is the full sample (slightly changed to always use single thread): https://godbolt.org/z/qYbxWerYT
@Dr15Jones , @makortel do you need more help with this issue ?
@anton-potapov we haven't yet had an opportunity to try it in our 'full' application. It does appear to work in the test code which had a referring pull request.
@Dr15Jones is this issue still relevant for you, did you had a chance to try proposed solution? Could you please respond?
@Dr15Jones kindly ping you.
@isaevil sorry, the messages from this GitHub repository are being sent to my spam filter so I'm not seeing them promptly.
Using tbb::task_group::defer
did allow us to wait in the way we needed. The only funny bit is we always pass a do nothing lambda to defer since we only need it to keep the task_group
waiting.
https://github.com/cms-sw/cmssw/blob/9976f1e5c7451f1d2e1ff9f3213ef87e3bf9ff28/FWCore/Concurrency/interface/FinalWaitingTask.h#L34-L35
Thanks for reaching out to be sure the issue was successful.
@Dr15Jones glad that proposed solution worked out. If this issue is successful, I would kindly ask you to close it :)