oneTBB icon indicating copy to clipboard operation
oneTBB copied to clipboard

tbb::task_group does not work well with serial task queues

Open Dr15Jones opened this issue 4 years ago • 4 comments

In our application we protect non-thread safe shared resources via serial task queues. When part of the application needs to use the shared resource it pushes a task into the appropriate serial task queue. The serial task queue makes sure that 1 and only 1 task it holds runs at a time. These tasks are usually set to run asynchronously, although a few places require them to happen synchronously. This all worked fine with the old tbb::task interface as the synchronous calls could just do the wait on the task which was added to the queue. Problems arise when using tbb::task_group.

The application needs to be able to wait for tasks to finish at several different spots. There is a wait that happens on the main thread to wait for all work of the application to finish. There are also waits that happen synchronously from within a running task. When all we have are tbb::task_groups it means each wait has to use a different tbb::task_group. This is a problem for the synchronous waits. So if a tbb::task_group has tasks presently running on other threads and there is no more tasks for the thread doing the wait to run, then tasks from other tbb::task_groups can be run on the waiting threads. However, if there are no running tasks from the tbb::task_group then that tbb::task_group will immediately stop waiting. If one has a queue of tasks and the tasks earlier in the queue are from a different tbb::task_group and the tasks from the synchronous waiting tbb::task_group are later in the queue then there is no guarantee that the earlier tasks will ever be run (as the waiting tbb::task_group will not run them as its tasks have yet to be started).

This is illustrated in the code added as an additional comment to this issue. The code issues two tasks on the 'outer' (i.e. main) tbb::task_group and then attempts to do a synchronous wait for an 'inner' task to complete. If the program runs with >1 thread, the program works fine. If run with just 1 thread none of the tasks will be run.

If we use the resumable task API preview feature: https://www.threadingbuildingblocks.org/docs/help/index.htm#reference/appendices/preview_features/resumable_tasks.html the it is possible to safely do a synchronous wait.

Given the ability to use serial task queues with synchronous waits is a 'deal breaker' for use (we've been using TBB for 5 years now) we would like to know

  • is there a way to safely use tbb::task_groups in this way, or
  • will the resumable task API (or something similar) be added to the production release of TBB in the near (say 3 month) future?

Dr15Jones avatar Feb 11 '21 20:02 Dr15Jones

Here is the example code

#if defined(USE_RESUMABLE_TASKS)
#define TBB_PREVIEW_RESUMABLE_TASKS 1
#include <tbb/task.h>
#endif
#include <tbb/task_group.h>
#include <tbb/global_control.h>
#include <iostream>
#include <atomic>
#include <tbb/concurrent_queue.h>
#include <functional>

//Serializes task calls
class TaskQueue {
private:
  class QueuedTask;
public:

  template<typename F>
  void push(tbb::task_group& iGroup, F&& iF) {
    queue_.push(new QueuedTask(iGroup, std::function<void()>(iF), this) );

    bool expected = false;
    if(started_.compare_exchange_strong(expected, true) ) {
      QueuedTask* next = nullptr;
      queue_.try_pop(next);
      next->runAsync();
    }
  }

private:
  friend class QueuedTask;

  QueuedTask* pop() {
    QueuedTask* next = nullptr;
    queue_.try_pop(next);
    if(nullptr == next) { started_ = false; }
    return next;
  }

  //When a task finishes it automatically starts
  // the next task in the queue
  class QueuedTask {
  public:
    QueuedTask(tbb::task_group& iGroup, std::function<void()> iFunc, TaskQueue* iQueue): 
      group_(&iGroup), f_(std::move(iFunc)), queue_(iQueue) {}
    
    void runAsync() {
      group_->run([this]() {
	  f_();
	  auto next = queue_->pop();
	  if(next) { next->runAsync(); }
	  delete this;
	});
    }
  private:
    tbb::task_group* group_;
    std::function<void()> f_;
    TaskQueue* queue_;
  };
  
  tbb::concurrent_queue<QueuedTask*> queue_;
  std::atomic<bool> started_{false};
};


int main(int argc, char const* const* argv ) {

  tbb::global_control gc(tbb::global_control::max_allowed_parallelism, std::stoi(argv[1]));

  tbb::task_group outer;
  TaskQueue taskQueue;

  //note: without this outer.run the tbb::task::suspend seg faults
  outer.run([&outer,&taskQueue]() {
      taskQueue.push(outer,[]() { std::cout<<"first"<<std::endl;});
      taskQueue.push(outer,[]() { std::cout<<"second"<<std::endl;});
      std::cout <<" inner wait started"<<std::endl;
#if defined(USE_RESUMABLE_TASKS)
      tbb::task::suspend([&outer,&taskQueue](tbb::task::suspend_point tag) {
	  taskQueue.push(outer,[&tag]() {
	      std::cout <<"inner"<<std::endl;
	      tbb::task::resume(tag);
	    });
	});
#else
      {
	tbb::task_group inner;
	std::atomic<bool> wasRun{false};
	taskQueue.push(inner, [&wasRun]() {std::cout <<"inner"<<std::endl; wasRun = true;});
	do {
	  inner.wait();
	}while(not wasRun);
      }
#endif
      std::cout <<" inner wait finished"<<std::endl;
      taskQueue.push(outer,[]() { std::cout<<"last"<<std::endl;});
    });
  outer.wait();
}

Dr15Jones avatar Feb 11 '21 20:02 Dr15Jones

@Dr15Jones, task_group::defer is the solution to this problem. QueuedTask should create a deferred task (task_handle) via call to task_group::defer, thus blocking the task_group::wait until the task, stored in the task_handle, is actually run. :

  class QueuedTask {
  public:
    QueuedTask(tbb::task_group& iGroup, std::function<void()> iFunc, TaskQueue* iQueue): 
      group_(&iGroup), 
      th_(iGroup.defer([this, iFunc, iQueue]{
        iFunc();
        auto next = iQueue->pop();
        if(next) { next->runAsync(); }
        delete this;
      }))
      {}
    
    void runAsync() {
        group_->run(std::move(th_));
    }
  private:
    tbb::task_group* group_;
    tbb::task_handle th_;
  };

here is the full sample (slightly changed to always use single thread): https://godbolt.org/z/qYbxWerYT

anton-potapov avatar Jan 27 '22 09:01 anton-potapov

@Dr15Jones , @makortel do you need more help with this issue ?

anton-potapov avatar Feb 25 '22 12:02 anton-potapov

@anton-potapov we haven't yet had an opportunity to try it in our 'full' application. It does appear to work in the test code which had a referring pull request.

Dr15Jones avatar Mar 03 '22 22:03 Dr15Jones

@Dr15Jones is this issue still relevant for you, did you had a chance to try proposed solution? Could you please respond?

isaevil avatar Oct 05 '22 14:10 isaevil

@Dr15Jones kindly ping you.

isaevil avatar Oct 25 '22 09:10 isaevil

@isaevil sorry, the messages from this GitHub repository are being sent to my spam filter so I'm not seeing them promptly.

Using tbb::task_group::defer did allow us to wait in the way we needed. The only funny bit is we always pass a do nothing lambda to defer since we only need it to keep the task_group waiting.

https://github.com/cms-sw/cmssw/blob/9976f1e5c7451f1d2e1ff9f3213ef87e3bf9ff28/FWCore/Concurrency/interface/FinalWaitingTask.h#L34-L35

Thanks for reaching out to be sure the issue was successful.

Dr15Jones avatar Oct 25 '22 18:10 Dr15Jones

@Dr15Jones glad that proposed solution worked out. If this issue is successful, I would kindly ask you to close it :)

isaevil avatar Oct 26 '22 12:10 isaevil