[work in progress] Coroutines the Third
Third times the charm, right?
This is an evolution of #11554, the core state machine transformation has remained pretty much untouched from that branch, but the surrounding scaffolding code has been changed to more closely align with kotlin's coroutine implementation. This should hopefully address the issues surrounding continuation, scheduling, and a few others which were discovered in the other PR.
Changes
Coroutine functions are now transformed to accept a Continuation<Any> argument, return Any, and contain the state machine transformation. The continuation argument either holds the state of a paused execution of the coroutine function, or what will be invoked once the coroutine completes. The any return will either be the result of the coroutines execution, or a special token indicating that it's execution has been suspended.
@:coroutine function foo(i:Int):String {}
function foo(i:Int, _hx_continuation:ICoroutine<Any>):Any {}
In addition each coroutine function has a class generated for it. This class implements IContinuation<Any> and stores data related to the execution of a coroutine, this includes, the current state, the result or error, variables accessed across states, and a reference to the continuation which will be invoked once this coroutine completes, essentially creating a chain. The continuation interface contains a resume function which when called resumes execution of the coroutine and is responsible to scheduling the execution of the coroutine into an appropriate thread.
class HxCoro__Main_foo implements IContinuation<Any> {
public var _hx_result : Any;
public var _hx_error : Exception;
public var _hx_state : Int;
public var _hx_context : CoroutineContext;
public var _hx_completion : IContinuation<Any>;
public function resume(result:Any, error:Exception) {
_hx_result = result;
_hx_error = error;
_hx_context.scheduler.schedule(() -> {
try {
var result = Main.foo(0, this);
if (result is Primitive) {
return;
} else {
_hx_completion.resume(result, null);
}
} catch (exn:Exception) {
_hx_completion.resume(null, exn);
}
});
}
}
Coroutines can be started with the blocking function Coroutine.run, which blocks until a coroutine completes. Internally it has its own event loop which all coroutine continuations are scheduled onto.
A quick hello world example is provided below. It should work on all sys targets, but some targets might need their generators updated.
import haxe.coro.Coroutine;
import haxe.coro.Coroutine.delay;
@:coroutine function test() {
fancyPrint('hello');
delay(1000);
fancyPrint('world');
}
@:coroutine function fancyPrint(text:String):Void {
for (i in 0...text.length) {
Sys.print(text.charAt(i));
delay(100);
}
Sys.print('\n');
}
function main() {
Coroutine.run(test);
}
Resources
I'm putting a few links below, specifically related to kotlin's coroutines, which helped the puzzle come together a bit for me.
https://kt.academy/article/cc-under-the-hood
That is a good overview of the underlying machinery of kotlin's coroutines. Jumping straight into decompiled kotlin is very daunting as there are many, many layers of abstraction due to optimisations, features, etc. That article cuts through all of it and gets right to the core of whats happening. I pretty much copied that for this coroutine attempt.
https://www.droidcon.com/2022/09/22/design-of-kotlin-coroutines/
This then goes one layer deper, it covers a lot of those layers you see in kotlin's implementation and explains what they're there for.
https://github.com/JetBrains/kotlin/blob/master/compiler/backend/src/org/jetbrains/kotlin/codegen/coroutines/coroutines-codegen.md
Finally, the mega document. Goes in depth into the jvm codegen of kotlin's coroutines.
I noticed that we were stuck on an outdated hxcpp branch. After updating this we now have green CI for cpp, which is a good baseline.
As the next step I'd like to get the JVM tests green. And afterwards we'll have to figure out how to make all this work on non-sys targets, because our coroutine execution currently relies on sys.thread.EventLoop.
Update:
- cpp still works
- jvm works as well now after fixing a crazy inference problem
- On js, the coro tests are passing too after Aidan implemented a non-sys EventLoop. CI is still red here because the server tests will need an updated version of what @klabz did in https://github.com/kLabz/utest/commit/1b9e193e9516af3cd3579ecbbbd4509fa0a7f534
- eval runs into #12176
- neko and python do not run coro tests yet
- flash, php and lua run into https://github.com/Aidan63/haxe/issues/7, and also don't run the coro tests yet
- hl has two failing tests:
TestBasic
testResumeWithError: FAILURE F
line: 30, exception of type haxe.Exception not raised
testSimple: FAILURE F
line: 6, expected 42 but it is 330584
Maybe @yuxiaomao could take a look at that some time. We also don't seem to run these tests on CI yet because HL is green.
Update:
- cpp, jvm and js still work.
- hl works now too after reworking our data design in https://github.com/Aidan63/haxe/pull/33
- eval still fails until someone looks at #12176
- https://github.com/Aidan63/haxe/issues/7 has been resolved and so the CI for the affected targets is green now.
- neko, python, php and lua don't run the coro tests yet and probably all have some problem to sort out.
- flash will not be updated to support coroutines unless somebody steps up to do so.
Easter update:
- cpp, jvm, js and hl still work
- neko, python, php and lua work too now after fixing minor issues for each of them (except python which just worked)
- eval still fails until someone looks at #12176
- We have refined our data design again in https://github.com/Aidan63/haxe/pull/40 by making coroutines exception-free. Users can still work with exceptions normally, but internally there are no exceptions being thrown around. Our hope is that this will make various aspects easier to reason about down the line.
Since I'm accidentally opening PRs from the coroutine branch already anyway, I thought I should give an update on the state of affairs!
Lower level
The lower level, which is the transformation of @:coroutine marked functions to state machines that support suspension, has been pretty stable for a few weeks now. We still occasionally come across bugs in the transformer, but the overall architecture is quite solid and works well on all targets.
There currently isn't much optimization (by which I mean it is outright disabled) because we want to focus on stability first. But even without that, the performance we're seeing is quite good.
Structured concurrency
We've been looking into the higher level in the form of "coroutine tasks", with a strong focus on structured concurrency. This is best explained as a code example:
import haxe.Timer;
import hxcoro.CoroRun;
import hxcoro.Coro.*;
function main() {
final startTime = Timer.stamp();
final result = CoroRun.runScoped(node -> {
final infiniteTicker = node.async(_ -> {
var i = 0;
while (true) {
trace(i++ * 200);
delay(200);
}
});
final lazyTask = node.lazy(_ -> trace("We're done"));
final slowTask = node.async(node -> {
trace("slowly doing something");
var aggregateValue = 0;
var completedChildren = 0;
for (_ in 0...500) {
node.async(_ -> {
while (Math.random() > 0.01) {
yield();
aggregateValue++;
}
completedChildren++;
});
}
delay(1500);
trace('$completedChildren children completed');
return aggregateValue;
});
final result = slowTask.await();
infiniteTicker.cancel();
lazyTask.start();
return result;
});
trace('done after ${Timer.stamp() - startTime} seconds, aggregate value is $result');
}
This creates a task tree which looks like this:
top
infiniteTicker
lazyTask
slowTask
500 child tasks
And it behaves like this:
Even though there's no actual threading yet (there will be), structured concurrency allows us to run multiple tasks "at the same time", which we can see by the overall run-time of the program being just slightly above the 1500ms from the top task's delay. The example also shows that we support cooperative cancellation (infiniteTicker.cancel()) and lazy tasks (lazyTask.start()).
Data structures
Something else we've started working on are coroutine-aware data structures. For now we only have hxcoro.ds.Channel, which is a queue that allows asynchronous reading and writing like this:
import hxcoro.ds.Channel;
import haxe.Timer;
import hxcoro.CoroRun;
import hxcoro.Coro.*;
function main() {
final startTime = Timer.stamp();
var aggregateValue = 0;
CoroRun.runScoped(node -> {
final channel = new Channel();
final numTasks = 10000;
// set up writers
for (_ in 0...numTasks) {
node.async(_ -> {
delay(Std.random(1000));
channel.write(1);
});
}
// set up readers
for (_ in 0...numTasks) {
node.async(_ -> {
delay(Std.random(1000));
aggregateValue += channel.read();
});
}
});
trace('done after ${Timer.stamp() - startTime} seconds, aggregate value is $aggregateValue');
}
Compiling this to JavaScript, we get an outcome like this:
source/Main.hx:27: done after 1.1163641000166535 seconds, aggregate value is 10000
The result value being 10000 confirms that all writers and readers did indeed run, and the overall execution time shows that we have low overhead (we can expect at least one task to delay to the full 1000ms, so that's the baseline). This also scales quite well, though how far we can take this still depends largely on the target:
| numTasks | hl | cpp | js | jvm |
|---|---|---|---|---|
| 1000 | 1.062 | 1.003 | 1.022 | 1.026 |
| 10000 | 1.589 | 1.087 | 1.116 | 1.037 |
| 100000 | 8.618 | 1.878 | 1.638 | 1.266 |
| 1000000 | 78.568 | crash | oom | 4.045 |
Looks like there's more to optimize here yet!
hxcoro
We have decided to keep the lower level in Haxe's std, but move everything that's not strictly related to it into a hxcoro haxelib. For the time being, the sources for this haxelib are still in std itself because that simplifies development a lot, but this will eventually be moved outside.
Is there any plan for https://github.com/HaxeFoundation/haxe/issues/12406 issue?
This is a big limitation for creating production-ready http servers.
Is a ThreadLocal-like implementation possible for coroutines?
https://github.com/hxwell/hxwell/blob/f0723379b43a9d97c25088fbbb50ca07aba640ed/src/haxe/ThreadLocal.hx