cppcoro icon indicating copy to clipboard operation
cppcoro copied to clipboard

Support running async cleanup operations when an async_generator is destroyed early

Open lewissbaker opened this issue 6 years ago • 2 comments

@ericniebler's recent blog entry, Ranges, Coroutines, and React: Early Musings on the Future of Async in C++, contained the following code-snippet that showed how async_generator<T> could be used to read a file in chunks:

auto async_file_chunk( const std::string& str ) -> async_generator<static_buf_t<1024>&>
{
  static_buf_t<1024> buffer;
 
  fs_t openreq;
  uv_file file = co_await fs_open(uv_default_loop(), &openreq, str.c_str(), O_RDONLY, 0);
  if (file > 0)
  {
    while (1)
    {
      fs_t readreq;
      int result = co_await fs_read(uv_default_loop(), &readreq, file, &buffer, 1, -1);
      if (result <= 0)
        break;

      buffer.len = result;
      co_yield buffer;
    }
    fs_t closereq;
    (void) co_await fs_close(uv_default_loop(), &closereq, file);
  }
}

Unfortunately, this snippet contains a flaw in that if the consumer stops consuming the elements of the async_generator before it reaches the end() of the sequence then the file handle will never be closed.

Currently, when the async_generator object is destructed it calls coroutine_handle<>::destroy() which destroys the coroutine frame and any object that was in-scope at the co_yield statement where the producer coroutine last produced a value. The coroutine does not get a chance to execute any code to clean up resources other than via running destructors. This allows you to perform synchronous cleanup operations via usage of RAII types but it means you can't perform any async cleanup operations (like the async fs_close example above, or gracefully shutting down a socket connection).

What if, instead of destroying the coroutine frame when the async_generator is destroyed we resume the generator coroutine but instead make the co_yield expression return an error-code (or throw a generation_cancelled exception)? This would then allow the coroutine to respond to the cancellation request and perform any cleanup operations.

For example, if the co_yield expression were to return an enum value of generator_op:::move_next or generator_op::cancel then the main-loop of the above snipped could have been modified thus:

auto async_file_chunk( const std::string& str ) -> async_generator<static_buf_t<1024>&>
{
  static_buf_t<1024> buffer;
 
  fs_t openreq;
  uv_file file = co_await fs_open(uv_default_loop(), &openreq, str.c_str(), O_RDONLY, 0);
  if (file > 0)
  {
    while (1)
    {
      fs_t readreq;
      int result = co_await fs_read(uv_default_loop(), &readreq, file, &buffer, 1, -1);
      if (result <= 0)
        break;

      buffer.len = result;

      // The next two lines are the only ones that have changed.
      generator_op yieldResult = co_yield buffer;
      if (yieldResult == generator_op::cancel) break;
    }
    fs_t closereq;
    (void) co_await fs_close(uv_default_loop(), &closereq, file);
  }
}

If the coroutine subsequently tried to execute another co_yield expression after it had been cancelled then the co_yield expression would complete immediately again with the generator_op::cancel result.

There are a couple of issues with this approach, however:

  1. The first is that the syntax for writing a correct producer coroutine is now more complicated (you need to check return-value of co_yield expression). This could be somewhat alleviated by either throwing an exception (which I'm also not liking that much since I don't like to use exceptions for expected/normal control-flow) Or it could be an opt-in behaviour (eg. by constructing some scoped object on the stack, or otherwise communicating with the promise that you want to continue after a cancellation request)
  2. The continued execution of the coroutine now represents a detached computation that you cannot synchronise against, or otherwise know when it will complete. eg. If it is closing a file asynchronously I may still want to know when the file has been closed so that I can delete it or open it again once the file lock has been released.

The second issue is perhaps the greater one here.

This could possibly be mitigated by requiring the caller to explicitly start the production of the generator by awaiting a task<> returned by the async_generator<T>::produce() member function. With such a task, the generator would not start executing until both the produce() task and the task returned by begin() had been awaited. The caller would need to use something like when_all() to concurrently execute both tasks.

The producer task would not complete until the generator coroutine has run to completion. The consumer task is free to destroy the async_generator and complete early without consuming the entire sequence. This would send a cancel request to the producer task so it can terminate quickly.

There are potential issues with composability of such an async_generator that I haven't yet worked through.

lewissbaker avatar Aug 28 '17 12:08 lewissbaker

This could possibly be mitigated by requiring the caller to explicitly start the production of the generator by awaiting a task<> returned by the async_generator<T>::produce() member function. With such a task, the generator would not start executing until both the produce() task and the task returned by begin() had been awaited. The caller would need to use something like when_all() to concurrently execute both tasks.

What is produce() and how is it different from begin()?

ericniebler avatar Aug 28 '17 16:08 ericniebler

The idea behind produce() is that it returns a task that represents the execution of the generator coroutine body. ie. the producer task. The coroutine body will not start until both the producer task is awaited AND the begin() awaitable is awaited. This ensures that there is an async continuation already attached that will wait for async cleanup before the generator even starts, which is required to be able to guarantee that there is no dangling threads of execution.

This is thinking about the async_generator as two tasks (a producer and consumer) synchronising such that only one of them is running at a time. However, once the consumer requests cancellation of the sequence by destructing the async_generator the two tasks diverge and run concurrently.

An example to show the concept (I'm not that happy with syntax yet):

async_generator<static_buffer_t<1024>> async_file_chunk(const std::string& str);

task<> consumer(async_generator<static_buffer_t<1024>> fileChunks)
{
  for co_await (auto& chunk : fileChunks)
  {
     // Might terminate early
     if (someCond) break;
  }
}

task<> usage()
{
  auto chunks = async_file_chunk("/path/to/file");

  // Get the task that produces the 
  task<> producerTask = chunks.produce();
  co_await when_all(producerTask, consume(std::move(chunks)));
}

lewissbaker avatar Aug 28 '17 21:08 lewissbaker