cppcoro icon indicating copy to clipboard operation
cppcoro copied to clipboard

Add when_any() for waiting for at least one task to complete

Open lewissbaker opened this issue 7 years ago • 4 comments

The difficult part of designing when_any() will be how to handle cancellation of the co_await operations of the other tasks.

Currently, the task<T> and shared_task<T> types don't allow the caller to cancel the co_await operation once it has been awaited. We need to wait for the task to complete before the awaiting coroutine returns.

If the tasks themselves are cancellable, we could hook something up using cancellation_tokens. eg. If we pass the same cancellation_token into each task then concurrently await all of the tasks and when any task completes, we then call request_cancellation() on the cancellation_source to request the other tasks to cancel promptly. Then we could just use when_all() to wait for all of the tasks.

To do this more generally, we'd need to be able to cancel the await operation on a task without necessarily cancelling the task itself. This would require a different data-structure in the promise object for keeping track of awaiters to allow unsubscribing an awaiter from that list in a lock-free way. Maybe consider a similar data-structure to that used by cancellation_registration?

lewissbaker avatar May 09 '17 06:05 lewissbaker

A somewhat cumbersome way of implementing the when_any() pattern such that the first operation to complete is a "winner" and the others are "losers" and should be cancelled:

task<> alternative1(cancellation_token ct);
task<> alternative2(cancellation_token ct);

task<> cancel_on_success(task<> t, cancellation_source cs)
{
  co_await t;
  cs.request_cancellation();
}

task<> do_1_or_2(cancellation_token ct)
{
  cancellation_source source;
  cancellation_registration cancellationForwarder{ ct, [&source] { source.request_cancellation(); } };
  co_await when_all(
    cancel_on_success(alternative1(source.token()), source),
    cancel_on_success(alternative2(source.token()), source));
}

This will cancel both sub-tasks if the cancellation_token passed in has cancellation requested. When either sub-task completes successfully (ie. without throwing an exception) then it will request cancellation of the other task.

lewissbaker avatar Jul 07 '17 06:07 lewissbaker

The other main use-case of when_any() is to act as an event loop, allowing code to handle completion of a number of concurrently executing tasks serially in the order they complete in.

eg. Something like this (not thoroughly thought out)

task<std::string> get_record(int id);

task<> example()
{
  std::vector<task<std::string>> tasks;
  for (int i = 0; i < 100; ++i) tasks.push_back(get_record(i));

  while (!tasks.empty())
  {
    std::size_t readyIndex = co_await when_any(tasks);
    std::cout << co_await tasks[readyIndex] << std::endl;
    tasks.erase(tasks.begin() + readyIndex);
  }
}

However, I feel that something like this could be handled just as well using (and possibly more efficiently) using when_all to execute them concurrently and async_mutex to serialise processing of each individual event. eg.

task<std::string> get_record(int id);

task<> example()
{
  async_mutex mutex;

  auto handleRecord = [&](int id) -> task<>
  {
    auto& result = co_await get_record(id);
    {
      auto lock = co_await mutex.scoped_lock_async();
      std::cout << result << std::endl;
    }
  };

  std::vector<task<>> tasks;
  for (int i = 0; i < 100; ++i) tasks.push_back(handleRecord(i));
  co_await when_all(std::move(tasks));
}

lewissbaker avatar Mar 15 '18 00:03 lewissbaker

Without when_any for the time being, what would be a good way check whether a task has completed either within a certain timeout value or by a certain time?

MonaTheMonad avatar Jan 17 '19 00:01 MonaTheMonad

If you just want to check whether a task completed within a certain time then you can just query the time both before and after the task completes and check the total time taken.

If you want to cancel the operation after a certain timeout has elapsed then you can use when_all() to execute the task concurrently with a schedule_after(timeout) operation and then use a cancellation_token to cancel the task after the timeout has elapsed.

cppcoro::task<int> foo(cppcoro::cancellation_token ct);

cppcoro::task<int> foo_with_timeout(
  cppcoro::io_service& ioSvc,
  std::chrono::milliseconds timeout)
{
  cppcoro::cancellation_source src;
  auto [result, unused] = co_await cppcoro::when_all(
    [&]() -> cppcoro::task<int> {
      auto cancelOnExit = cppcoro::on_scope_exit([&] { src.request_cancellation(); });
      co_return co_await foo(src.token());
    }(),
    [&]() -> cppcoro::task<void> {
      auto cancelOnExit = cppcoro::on_scope_exit([&] { src.request_cancellation(); });
      co_await ioSvc.schedule_after(timeout);
    }());
  co_return result;
}

This will then need the operation to wait for the timer to be cancelled before it can return a result, however. You may be able to use the async_scope class to spawn the timer task to be waited for at a higher-level and avoid the extra latency.

lewissbaker avatar Jan 17 '19 01:01 lewissbaker