asio icon indicating copy to clipboard operation
asio copied to clipboard

boost asio parallel_group per operation cancellation for channels

Open vkhristenko opened this issue 2 years ago • 1 comments

just reposting from here

https://stackoverflow.com/questions/72741428/boost-asio-parallel-group-per-operation-cancellation


below is a snippet of small producer/consumer example

#include <iostream>

#include <boost/asio/use_awaitable.hpp>
#include <boost/system/detail/generic_category.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio.hpp>

#include <boost/asio/experimental/as_tuple.hpp>

using namespace boost::asio::experimental::awaitable_operators;

template<typename T>
struct Channel : public boost::asio::experimental::channel<void(boost::system::error_code, T)> {
    using boost::asio::experimental::channel<void(boost::system::error_code, T)>::channel;};

boost::asio::awaitable<void> consumer(Channel<int>& ch1, Channel<int>& ch2, 
        int nexpected) {
    int num = 0;
    for (;;) {
        auto [order, ex0, r0, ex1, r1] = co_await boost::asio::experimental::make_parallel_group(
            [&ch1](auto token) {
                return ch1.async_receive(std::move(token));
            },
            [&ch2](auto token) {
                return ch2.async_receive(std::move(token));
            }
        ).async_wait(
            boost::asio::experimental::wait_for_one{}, 
            boost::asio::use_awaitable);

        std::cout << "num = " << num << std::endl;
        num++;
        if (num == nexpected) {
            std::cout << "consumer is all done" << std::endl;
            break;
        }
    }

    assert(num == nexpected && "sent must be equal received");
}

boost::asio::awaitable<void> producer(Channel<int>& ch, int const n, int const id) {
    for (int i=0; i<n; i++) {
        auto value = id == 1 ? i : -i;
        std::cout << "producer " << id << ": sending " << value << std::endl;
        auto const [ec] = co_await ch.async_send(
            boost::system::error_code{}, 
            value, 
            boost::asio::experimental::as_tuple(boost::asio::use_awaitable)); 
        if (ec) std::cout << "error!" << std::endl;
    }

    co_return;
}

void test0() {
    auto ctx = boost::asio::io_context{};
    std::size_t const n = 10;
    auto ch1 = Channel<int>{ctx, 10};
    auto ch2 = Channel<int>{ctx, 10};
    
    boost::asio::co_spawn(
        ctx,
        producer(ch2, 100, 2),
        boost::asio::detached
    );

   
    boost::asio::co_spawn(
        ctx,
        producer(ch1, 100, 1),
        boost::asio::detached
    );
    
    boost::asio::co_spawn(
        ctx,
        consumer(ch1, ch2, 200),
        boost::asio::detached
    );

    ctx.run();
}

int main() {
    test0();

    return 0;
}

In short, there are 2 boost asio experimental channels. there are 2 producers and 1 consumer. consumer reads from either one of these channels. I"m using make_parallel_group with wait_for_one, which waits for one and cancels the others.

When running the program, I observe that one async_receive completes, the other is cancelled and the async_send is somehow cancelled without error code stating that it was cancelled. Basically that means that consumer sees only 100 values. i expect to see all 200 values.

questions:

I'm expecting per operation cancellation here. async_receive cancelled does not force cancelling of async_send. looking at the source code of parallel_group (detail namespace), I do not see calls to bind_cancellation_slot one a per operation basis... am i missing something? thanks

vkhristenko avatar Jun 24 '22 09:06 vkhristenko

Your expectation wrt to per operation cancellation is correct. Note, however, that for any given set of operations, there's a chance both operations may succeed (if the results are already available). In your case, both async_receive will return the values from the channel. You can check this by outputting your results:

std::cout << "num = " << num << "; " << r0 << "; " << r1 << std::endl;

andrei-datcu avatar Jul 01 '22 14:07 andrei-datcu