asio
asio copied to clipboard
boost asio parallel_group per operation cancellation for channels
just reposting from here
https://stackoverflow.com/questions/72741428/boost-asio-parallel-group-per-operation-cancellation
below is a snippet of small producer/consumer example
#include <iostream>
#include <boost/asio/use_awaitable.hpp>
#include <boost/system/detail/generic_category.hpp>
#include <boost/asio/experimental/channel.hpp>
#include <boost/asio/experimental/awaitable_operators.hpp>
#include <boost/asio.hpp>
#include <boost/asio/experimental/as_tuple.hpp>
using namespace boost::asio::experimental::awaitable_operators;
template<typename T>
struct Channel : public boost::asio::experimental::channel<void(boost::system::error_code, T)> {
using boost::asio::experimental::channel<void(boost::system::error_code, T)>::channel;};
boost::asio::awaitable<void> consumer(Channel<int>& ch1, Channel<int>& ch2,
int nexpected) {
int num = 0;
for (;;) {
auto [order, ex0, r0, ex1, r1] = co_await boost::asio::experimental::make_parallel_group(
[&ch1](auto token) {
return ch1.async_receive(std::move(token));
},
[&ch2](auto token) {
return ch2.async_receive(std::move(token));
}
).async_wait(
boost::asio::experimental::wait_for_one{},
boost::asio::use_awaitable);
std::cout << "num = " << num << std::endl;
num++;
if (num == nexpected) {
std::cout << "consumer is all done" << std::endl;
break;
}
}
assert(num == nexpected && "sent must be equal received");
}
boost::asio::awaitable<void> producer(Channel<int>& ch, int const n, int const id) {
for (int i=0; i<n; i++) {
auto value = id == 1 ? i : -i;
std::cout << "producer " << id << ": sending " << value << std::endl;
auto const [ec] = co_await ch.async_send(
boost::system::error_code{},
value,
boost::asio::experimental::as_tuple(boost::asio::use_awaitable));
if (ec) std::cout << "error!" << std::endl;
}
co_return;
}
void test0() {
auto ctx = boost::asio::io_context{};
std::size_t const n = 10;
auto ch1 = Channel<int>{ctx, 10};
auto ch2 = Channel<int>{ctx, 10};
boost::asio::co_spawn(
ctx,
producer(ch2, 100, 2),
boost::asio::detached
);
boost::asio::co_spawn(
ctx,
producer(ch1, 100, 1),
boost::asio::detached
);
boost::asio::co_spawn(
ctx,
consumer(ch1, ch2, 200),
boost::asio::detached
);
ctx.run();
}
int main() {
test0();
return 0;
}
In short, there are 2 boost asio experimental channels. there are 2 producers and 1 consumer. consumer reads from either one of these channels. I"m using make_parallel_group with wait_for_one, which waits for one and cancels the others.
When running the program, I observe that one async_receive completes, the other is cancelled and the async_send is somehow cancelled without error code stating that it was cancelled. Basically that means that consumer sees only 100 values. i expect to see all 200 values.
questions:
I'm expecting per operation cancellation here. async_receive cancelled does not force cancelling of async_send. looking at the source code of parallel_group (detail namespace), I do not see calls to bind_cancellation_slot one a per operation basis... am i missing something? thanks
Your expectation wrt to per operation cancellation is correct. Note, however, that for any given set of operations, there's a chance both operations may succeed (if the results are already available). In your case, both async_receive
will return the values from the channel. You can check this by outputting your results:
std::cout << "num = " << num << "; " << r0 << "; " << r1 << std::endl;