kelcoro
kelcoro copied to clipboard
C++20 coroutine library
KelCoro - C++20 coroutine library for C++
How to build?
Module kel.coro includes:
- Coroutine types
generator<T>logical_threadjobasync_task<T>channel<T>task<T>
- Functions or seems like functions
jump_on(Executor)stop(Args...)invoked_in(Foo)yield
- Event system
event_tevery_event_twhen_allwhen_any
- Concepts
co_awaiterco_awaitableexecutor
Class Details
generator<T>
interface:
// not const! Calculates first value when called!
iterator begin() &;
// generator can be borrowed range, if you explicitly ask
iterator_owner begin() &&;
static std::default_sentinel_t end();
// maybe you need it for something
value_type& next();
bool has_next() const;
// for using generator as borrowed range
// (std::ranges do not forward its arguments, so begin&& will not be called
// on rvalue generator in Gen() | std::views*something*) (compilation error)
// but Get().view() | ... works perfectly
auto view() &&;
- Calculates first value when.begin() / .next() called
- It is an input AND output range(co_yielded lvalues can be safely changed from consumer side), which means every value from generator will appear only once
Lifetime: frame dies with coroutine object (or special owner iterator if you call.begin() on rvalue generator)
example:
generator<int> Gen() {
for (size_t i = 0;; ++i)
co_yield i;
}
void GeneratorUser() {
for (auto i : Gen())
;// do smth with i
}
logical_thread
interface(same as jthread):
bool joinable();
void join(); // block until coroutine is done
std::stop_source detach();
handle_type native_handle(); // returns coroutine_handle
bool request_stop();
std::stop_token get_token();
std::stop_source get_stop_source();
It is cancellable coroutine, which behavior similar to https://en.cppreference.com/w/cpp/thread/jthread (can be .request_stop(), automatically requested for stop and joined in destructor or when move assigned etc)
Execution starts immediately when logical_thread created. If unhandled exception happens in logical_thread std::terminate is called
Lifetime: If not .detach(), then coroutine frame dies with coroutine object, else (if.detach() called) frame dies after co_return (more precisely after final_suspend)
example :
kel::logical_thread Bar() {
// imagine that already C++47 and networking in the standard
auto socket = co_await async_connect(endpoint);
auto token = co_await this_coro::stop_token; // cancellable coroutine can get stop token associated with it
while (!token.stop_requested()) {
auto write_info = co_await socket.async_write("Hello world");
auto read_result = co_await socket.async_read();
std::cout << read_result;
}
}
job
behaves as always detached logical_thread, but more lightweight, has no methods, because always detached == caller have no guarantees about possibility to manipulate coro
Lifetime: same as detached kel::logical_thread
async_task<T>
interface:
void wait() const; // blocks until result is ready
Result get(); // waits result and then returns it
If unhandled exception happens in async_task std::terminate is called Execution starts immediately when async_task created.
Result can be ignored, it is safe.
example:
task<std::string> DoSmth() {
co_await jump_on(another_thread);
co_return "hello from task";
}
async_task<void> TasksUser() {
std::vector<task<std::string>> vec;
for (int i = 0; i < 10; ++i)
vec.emplace_back(DoSmth());
for (int i = 0; i < 8; ++i) {
std::string result = co_await vec[i];
verify(result == "hello from task");
}
co_return;
}
Symmetric transfer between coroutines
What is symmetric transfer? this is when the coroutine does not just suspended, but transfers control to another coroutine. Then, in the future, the coroutine will be resumed. Thus, relative to the coroutine, this code looks absolutely synchronous, there is no synchronization and any overhead, but in fact, the coroutine to which control was transferred can be suspended and not return control. This allows implementing easy-to-use channels (see examples)
channel<T>
interface: only operator co_await() ! This means channel may be used only in another coroutine. co_await channel returns pointer to value co_yielded from channel(nullptr == end), it is pointer to exactly value, so it is two-way channel(consumer can change value via pointer)
example:
channel<int> CreateChannel() {
for (int i = 0; i < 100; ++i) {
co_await jump_on(another_thread);
// any hard working for calculating i
co_yield i; // control returns to caller ONLY after co_yield!
}
}
async_task<void> ChannelUser() {
auto my_channel = CreateChannel();
int i = 0;
// here we in cycle transfer control to channel,
// then it calculates value and returns control to us
while (auto* v = co_await my_channel) {
verify(*v == i);
++i;
}
}
task<T>
interface: only operator co_await ! This means task may be used only in another coroutine. It is just a channel between two coroutines for exactly one value(returned by co_return)
task is lazy (starts only when co_awaited)
example: see async_task example =)
Memory allocations for coroutines
Every coroutine supports allocators, (its last template argument of coroutine), by default it is default constructed, but you can use any memory resource with leading allocator convention or trailing allocator convention
example:
template <typename Resource>
task<int, Resource> SmartTask(std::allocator_arg_t, Resource /* i dont use it, coro frame uses it!*/)
// this task will use Resource for allocations! (it can be allocator, but also can be any with .allocate(size_t) / .deallocate(void*, size_t) methods)
Usefull
jump_on
co_await jump_on(Executor) equals to suspending coroutine and resume it on Executor(with .execute method), for example it can be thread pool or kel::this_thread_executor(executes all on this thread) / kel::noop_executor etc
stop
more effective way to stop(request_stop + join) for many stopable arguments or range of such type.
yield
co_yield X; is equal to co_await yield(X);
But in yield you can pass more then one argument to construct value in place
invoked_in
okay its just magic for functions which want a callback example:
// some function which accepts callback
bool Foo(int a, std::function<bool(int&, int)> f, int c) {
int value = 10;
// POINT 2: invoke a Foo from co_await call
auto result = f(value, a + b + c);
assert(value == 20);
// POINT 5: after coroutine suspends we are here
assert(result == true);
return result;
}
// usually it used for async callbacks, but it is possible to use for synchronous callbacks too
kel::job Bar() {
// POINT 1: entering
// value / summ is a references(YES even with auto) to arguments in Foo,
// ret is a result of callback(not represented if signature<void(...)>)
auto [value, summ, ret] = co_await this_coro::invoked_in(Foo, 10, signature<bool(int&, int)>{}, 20);
assert(value == 10);
value = 20;
assert(summ == 45);
// POINT 3: part of the coroutine(from co_await this_coro::invoked_in to next suspend) becomes callback for Foo
ret = true; // setting return value, because Foo expects it from callback
// POINT 4: first coroutine suspend after co_await
}
Event system
event_t
interface:
// Executor - who will execute tasks. input type by default *nothing*,
// but is can be customized with specialization event_traits<your_type>
// or by using input_type = your_type;
// returns false if no one has been woken up
template <executor Executor>
bool notify_all(Executor&& exe, input_type input);
// subscribe for not coroutines(allocator for task allocating)
template <typename Alloc = std::allocator<std::byte>, typename F>
void set_callback(F f, Alloc alloc = Alloc{});
// subscribe, but only for coroutines
auto operator co_await();
every_event_t
Interface: same as event_t, but have guarantee, that all calls .notify_all() will not be lost. This means:
struct MyTag {};
job Foo() {
// this code is NOT a race condition regardless of the order of .notify_all() calls
co_await every_event<MyTag>;
// if notify_all() was here on other thread AND we use event<MyTag> we can lose event,
// but with every_event<MyTag> we cant lose it
co_await every_event<MyTag>;
}
Thats why every_event.notify_all() returns void(notifies always will be handled)
Note: if you use notify on event<X>, then you must use co_await on event<X> too, and if you use notify on every_event<X>, then you must use co_await on every_event<X> Note: every_event<X> do not support input types(ill-formed (compilation error))
when_all
template <typename... Events>
auto when_all() ->
*awaiter which will return control to coroutine and inputs for every event when all event happens*
when_any
template <typename... Events>
auto when_any() ->
*awaiter which returns control to coroutine and variant with input(and info what happens) when any of Events happen*
example:
// just an event tags
struct one {};
struct two {
using input_type = int;
};
struct three {
using input_type = std::vector<std::string>;
};
async_task<void> waiter_all() {
co_await jump_on(another_thread);
for (int i : std::views::iota(0, 100)) {
auto tuple = co_await when_all<one, two, three>(::test::test_selector{});
std::cout << "All three events happen, i accept a tuple with all inputs!\n";
}
}
template <typename Event>
logical_thread notifier(auto& pool, auto input) {
co_await jump_on(another_thread);
while (true) {
// copy for all events, so no move
pool.notify_all<Event>(input);
// check if caller .request_stop()? then suspend
co_await quit_if_requested;
}
}
void WhenAllExample() {
event_pool<this_thread_executor> pool;
auto _1 = notifier<one>(pool);
auto _2 = notifier<two>(pool, 5);
auto _3 = notifier<three>(pool, std::vector<std::string>(3, "hello world"));
auto taskl = waiter_all(flag);
taskl.wait();
stop(_1, _2, _3); // .request_stop() and .join() for all
pool.notify_all<one>();
pool.notify_all<two>(5);
pool.notify_all<three>(std::vector<std::string>(3, "hello world"));
}
Concepts
co_awaiter
template <typename T>
concept co_awaiter = requires(T value) {
{ value.await_ready() } -> std::same_as<bool>;
value.await_resume();
};
co_awaitable
template <typename T>
concept co_awaitable = has_member_co_await<T> || has_global_co_await<T> || co_awaiter<T>;
executor
something with .execute([]{}) method
build
git clone https://github.com/kelbon/kelcoro
cd kelcoro
cmake . -B build OR cmake . -DENABLE_TESTS=ON -B build # with tests and main.cpp to try something
cmake --build build
Note cmake now may not support C++ modules(and clang...)