haxe icon indicating copy to clipboard operation
haxe copied to clipboard

[work in progress] Coroutines the Third

Open Aidan63 opened this issue 1 year ago • 5 comments

Third times the charm, right?

This is an evolution of #11554, the core state machine transformation has remained pretty much untouched from that branch, but the surrounding scaffolding code has been changed to more closely align with kotlin's coroutine implementation. This should hopefully address the issues surrounding continuation, scheduling, and a few others which were discovered in the other PR.

Changes

Coroutine functions are now transformed to accept a Continuation<Any> argument, return Any, and contain the state machine transformation. The continuation argument either holds the state of a paused execution of the coroutine function, or what will be invoked once the coroutine completes. The any return will either be the result of the coroutines execution, or a special token indicating that it's execution has been suspended.

@:coroutine function foo(i:Int):String {}

function foo(i:Int, _hx_continuation:ICoroutine<Any>):Any {}

In addition each coroutine function has a class generated for it. This class implements IContinuation<Any> and stores data related to the execution of a coroutine, this includes, the current state, the result or error, variables accessed across states, and a reference to the continuation which will be invoked once this coroutine completes, essentially creating a chain. The continuation interface contains a resume function which when called resumes execution of the coroutine and is responsible to scheduling the execution of the coroutine into an appropriate thread.

class HxCoro__Main_foo implements IContinuation<Any> {
	public var _hx_result : Any;

	public var _hx_error : Exception;

	public var _hx_state : Int;

	public var _hx_context : CoroutineContext;

	public var _hx_completion : IContinuation<Any>;

	public function resume(result:Any, error:Exception) {
		_hx_result = result;
		_hx_error  = error;

		_hx_context.scheduler.schedule(() -> {
			try {
				var result = Main.foo(0, this);
				if (result is Primitive) {
					return;
				} else {
					_hx_completion.resume(result, null);
				}
			} catch (exn:Exception) {
				_hx_completion.resume(null, exn);
			}
		});
	}
}

Coroutines can be started with the blocking function Coroutine.run, which blocks until a coroutine completes. Internally it has its own event loop which all coroutine continuations are scheduled onto.

A quick hello world example is provided below. It should work on all sys targets, but some targets might need their generators updated.

import haxe.coro.Coroutine;
import haxe.coro.Coroutine.delay;

@:coroutine function test() {
    fancyPrint('hello');

    delay(1000);

    fancyPrint('world');
}

@:coroutine function fancyPrint(text:String):Void {
    for (i in 0...text.length) {
        Sys.print(text.charAt(i));

        delay(100);
    }

    Sys.print('\n');
}

function main() {
    Coroutine.run(test);
}

Resources

I'm putting a few links below, specifically related to kotlin's coroutines, which helped the puzzle come together a bit for me.

https://kt.academy/article/cc-under-the-hood

That is a good overview of the underlying machinery of kotlin's coroutines. Jumping straight into decompiled kotlin is very daunting as there are many, many layers of abstraction due to optimisations, features, etc. That article cuts through all of it and gets right to the core of whats happening. I pretty much copied that for this coroutine attempt.

https://www.droidcon.com/2022/09/22/design-of-kotlin-coroutines/

This then goes one layer deper, it covers a lot of those layers you see in kotlin's implementation and explains what they're there for.

https://github.com/JetBrains/kotlin/blob/master/compiler/backend/src/org/jetbrains/kotlin/codegen/coroutines/coroutines-codegen.md

Finally, the mega document. Goes in depth into the jvm codegen of kotlin's coroutines.

Aidan63 avatar Apr 12 '25 14:04 Aidan63

I noticed that we were stuck on an outdated hxcpp branch. After updating this we now have green CI for cpp, which is a good baseline.

As the next step I'd like to get the JVM tests green. And afterwards we'll have to figure out how to make all this work on non-sys targets, because our coroutine execution currently relies on sys.thread.EventLoop.

Simn avatar Apr 14 '25 06:04 Simn

Update:

  • cpp still works
  • jvm works as well now after fixing a crazy inference problem
  • On js, the coro tests are passing too after Aidan implemented a non-sys EventLoop. CI is still red here because the server tests will need an updated version of what @klabz did in https://github.com/kLabz/utest/commit/1b9e193e9516af3cd3579ecbbbd4509fa0a7f534
  • eval runs into #12176
  • neko and python do not run coro tests yet
  • flash, php and lua run into https://github.com/Aidan63/haxe/issues/7, and also don't run the coro tests yet
  • hl has two failing tests:
TestBasic
  testResumeWithError: FAILURE F
    line: 30, exception of type haxe.Exception not raised
  testSimple: FAILURE F
    line: 6, expected 42 but it is 330584

Maybe @yuxiaomao could take a look at that some time. We also don't seem to run these tests on CI yet because HL is green.

Simn avatar Apr 15 '25 06:04 Simn

Update:

  • cpp, jvm and js still work.
  • hl works now too after reworking our data design in https://github.com/Aidan63/haxe/pull/33
  • eval still fails until someone looks at #12176
  • https://github.com/Aidan63/haxe/issues/7 has been resolved and so the CI for the affected targets is green now.
  • neko, python, php and lua don't run the coro tests yet and probably all have some problem to sort out.
  • flash will not be updated to support coroutines unless somebody steps up to do so.

Simn avatar Apr 17 '25 14:04 Simn

Easter update:

  • cpp, jvm, js and hl still work
  • neko, python, php and lua work too now after fixing minor issues for each of them (except python which just worked)
  • eval still fails until someone looks at #12176
  • We have refined our data design again in https://github.com/Aidan63/haxe/pull/40 by making coroutines exception-free. Users can still work with exceptions normally, but internally there are no exceptions being thrown around. Our hope is that this will make various aspects easier to reason about down the line.

Simn avatar Apr 20 '25 18:04 Simn

Since I'm accidentally opening PRs from the coroutine branch already anyway, I thought I should give an update on the state of affairs!

Lower level

The lower level, which is the transformation of @:coroutine marked functions to state machines that support suspension, has been pretty stable for a few weeks now. We still occasionally come across bugs in the transformer, but the overall architecture is quite solid and works well on all targets.

There currently isn't much optimization (by which I mean it is outright disabled) because we want to focus on stability first. But even without that, the performance we're seeing is quite good.

Structured concurrency

We've been looking into the higher level in the form of "coroutine tasks", with a strong focus on structured concurrency. This is best explained as a code example:

import haxe.Timer;
import hxcoro.CoroRun;
import hxcoro.Coro.*;

function main() {
	final startTime = Timer.stamp();
	final result = CoroRun.runScoped(node -> {
		final infiniteTicker = node.async(_ -> {
			var i = 0;
			while (true) {
				trace(i++ * 200);
				delay(200);
			}
		});

		final lazyTask = node.lazy(_ -> trace("We're done"));

		final slowTask = node.async(node -> {
			trace("slowly doing something");
			var aggregateValue = 0;
			var completedChildren = 0;
			for (_ in 0...500) {
				node.async(_ -> {
					while (Math.random() > 0.01) {
						yield();
						aggregateValue++;
					}
					completedChildren++;
				});
			}
			delay(1500);
			trace('$completedChildren children completed');
			return aggregateValue;
		});

		final result = slowTask.await();
		infiniteTicker.cancel();
		lazyTask.start();
		return result;
	});
	trace('done after ${Timer.stamp() - startTime} seconds, aggregate value is $result');
}

This creates a task tree which looks like this:

top
    infiniteTicker
    lazyTask
    slowTask
        500 child tasks

And it behaves like this:

cmd_0QvzRyLI9W

Even though there's no actual threading yet (there will be), structured concurrency allows us to run multiple tasks "at the same time", which we can see by the overall run-time of the program being just slightly above the 1500ms from the top task's delay. The example also shows that we support cooperative cancellation (infiniteTicker.cancel()) and lazy tasks (lazyTask.start()).

Data structures

Something else we've started working on are coroutine-aware data structures. For now we only have hxcoro.ds.Channel, which is a queue that allows asynchronous reading and writing like this:

import hxcoro.ds.Channel;
import haxe.Timer;
import hxcoro.CoroRun;
import hxcoro.Coro.*;

function main() {
	final startTime = Timer.stamp();
	var aggregateValue = 0;
	CoroRun.runScoped(node -> {
		final channel = new Channel();
		final numTasks = 10000;
		// set up writers
		for (_ in 0...numTasks) {
			node.async(_ -> {
				delay(Std.random(1000));
				channel.write(1);
			});
		}
		// set up readers
		for (_ in 0...numTasks) {
			node.async(_ -> {
				delay(Std.random(1000));
				aggregateValue += channel.read();
			});
		}
	});
	trace('done after ${Timer.stamp() - startTime} seconds, aggregate value is $aggregateValue');
}

Compiling this to JavaScript, we get an outcome like this:

source/Main.hx:27: done after 1.1163641000166535 seconds, aggregate value is 10000

The result value being 10000 confirms that all writers and readers did indeed run, and the overall execution time shows that we have low overhead (we can expect at least one task to delay to the full 1000ms, so that's the baseline). This also scales quite well, though how far we can take this still depends largely on the target:

numTasks hl cpp js jvm
1000 1.062 1.003 1.022 1.026
10000 1.589 1.087 1.116 1.037
100000 8.618 1.878 1.638 1.266
1000000 78.568 crash oom 4.045

Looks like there's more to optimize here yet!

hxcoro

We have decided to keep the lower level in Haxe's std, but move everything that's not strictly related to it into a hxcoro haxelib. For the time being, the sources for this haxelib are still in std itself because that simplifies development a lot, but this will eventually be moved outside.

Simn avatar Jun 01 '25 07:06 Simn

Is there any plan for https://github.com/HaxeFoundation/haxe/issues/12406 issue?

This is a big limitation for creating production-ready http servers.

barisyild avatar Nov 20 '25 14:11 barisyild

Is a ThreadLocal-like implementation possible for coroutines?

https://github.com/hxwell/hxwell/blob/f0723379b43a9d97c25088fbbb50ca07aba640ed/src/haxe/ThreadLocal.hx

barisyild avatar Nov 20 '25 14:11 barisyild