haxe icon indicating copy to clipboard operation
haxe copied to clipboard

[work in progress] Coroutines for Haxe

Open Simn opened this issue 1 year ago • 54 comments

Here's a different approach to handling the internal state for coroutines. This is pretty much #10128 but without the additional argument to TFun. The idea is this:

  • Instead of hijacking the typeloader upon encountering the "Coroutine" path, we let it load the abstract. The function type is carried in the type parameter.
  • We add com.basic.tcoro which works like tnull and creates Coroutine<Arguments -> Return>. This is used when typing @:coroutine functions.
  • Any place in the compiler that cares about coroutines calls follow_with_coro, which gives either a Coro(args, ret) or a NotCoro(t). This allows all code paths who are interested in coroutines (which in the entire compiler is only like 10) to check for them, but still deal with the non-coro behavior easily.

This gives us a pretty light diff here, which is always good. There's a chance that I'm missing something important because at the moment we only have the few tests in misc/coroutines.

Simn avatar Feb 06 '24 14:02 Simn

I'm making this the main coroutine PR with the following changes:

  • The logic from analyzerCoro.ml in #10128 has been moved and slightly cleaned up to coroToTexpr.ml.
  • The new module coroFromTexpr.ml generates the data structure that ^ works with.
  • That transformation is still very simplistic and basically only works with cases that we're testing. The overall structure should allow us to improve on this easily though.
  • Coroutines are now processed right after a function has been typed. I see no reason to wait until later, we already know that we're dealing with a coroutine anyway.
  • As a consequence, we now run all the normal filters on the transformed expression structure. This will deal with all the problems in #10128 related to capture variables and whatnot.
  • Doing that currently breaks the test in testTryCatchFail. This is related to #11574 and the problem is that the coroutine transformer eliminates the TTry, which causes the exception handler to not wrap the caught values. Running the expression wrapping immediately will fix this.
  • I tried testing this with C++ but ran into some problem with HX_STACKFRAME and '_hx_stateMachine': undeclared identifier. The related dump output looks clean, so this might be an entirely separate problem. Maybe @Aidan63 can give it a try!

Still a million things to figure out, but I'm slowly starting to understand how all this is supposed to work!

Simn avatar Feb 14 '24 06:02 Simn

Some notes on the transformation:

If we have this:

@:coroutine
@:coroutine.debug
private function mapCalls<TArg, TRet>(args:Array<TArg>, f:Coroutine<TArg->TRet>):Array<TRet> {
	return [for (arg in args) f(arg)];
}

The transformer looks at this:

function(args:Array<mapCalls.TArg>, f:Coroutine<mapCalls.TArg -> mapCalls.TRet>) {
        return {
                var ` = [];
                {
                        var ` = 0;
                        while (` < args.length) {
                                var arg = args[`];
                                ++ `;
                                `.push(f(arg));
                        };
                };
                `;
        };
  }

And generates this:

function(args:Array<mapCalls.TArg>, f:Coroutine<mapCalls.TArg -> mapCalls.TRet>, _hx_continuation:(Array<mapCalls.TRet>, Dynamic) -> Void) {
        var ` = null;
        var ` = 0;
        var _hx_state = 0;
        var _hx_exceptionState = 10;
        var _hx_stateMachine = function(_hx_result:Dynamic, _hx_error:Dynamic) {
                if (_hx_error != null) _hx_state = _hx_exceptionState;
                do (try switch (_hx_state) {
                        case 0: {
                                _hx_state = 1;
                        };
                        case 1: {
                                `;
                                ` = [];
                                _hx_state = 3;
                        };
                        case 2: {
                                _hx_state = -1;
                                _hx_continuation(null, null);
                                return;
                        };
                        case 3: {
                                `;
                                ` = 0;
                                _hx_state = 5;
                        };
                        case 4: {
                                _hx_state = -1;
                                _hx_continuation(`, null);
                                return;
                        };
                        case 5: {
                                if (! (` < args.length)) _hx_state = 7 else _hx_state = 8;
                        };
                        case 6: {
                                _hx_state = 4;
                        };
                        case 7: {
                                _hx_state = 6;
                        };
                        case 8: {
                                var arg;
                                arg = args[`];
                                ++ `;
                                _hx_state = 9;
                                f(arg, _hx_stateMachine)(null, null);
                                return;
                        };
                        case 9: {
                                `.push(cast _hx_result);
                                _hx_state = 5;
                        };
                        case 10: throw _hx_error;
                        default: {
                                _hx_state = 10;
                                throw "Invalid coroutine state";
                        }
                } catch (e:Dynamic) if (_hx_state == 10) {
                        _hx_exceptionState = 10;
                        _hx_continuation(null, e);
                        return;
                } else {
                        _hx_state = _hx_exceptionState;
                        _hx_error = e;
                }) while(true);
        };
        return _hx_stateMachine;
  }

For one, it would be nice to eliminate the forwarding states 0, 6 and 7. Ideally in a way that would still give us the nice state numbering without any holes.

The other thing that looks suspicious to me is that we have do try instead of try do. The only path back into the loop is from the else case in the catch, but it looks to me like we can instead recurse upon our function again after setting _hx_error = e. The function entry checks for if (_hx_error != null) _hx_state = _hx_exceptionState; already, which will give us the exact same state.

Perhaps this could even be optimized for when there's only a single exception state (which I think is the vast majority of coros) because then we can just call the continuation, which we would otherwise do after catching the throw _hx_error in state 10.

Yes I know, premature optimization and all that, but I find this stuff easier to debug if the code looks cleaner!

Simn avatar Feb 14 '24 07:02 Simn

I got the basics working on the JVM target. This increasingly feels like I have no idea what I'm doing, but that may or may not be normal when implementing something like coroutines.

The generator tests don't work yet because I still don't know how to actually implement Coroutine.suspend properly.

For now, I'm dealing with the Coroutine special cases in genjvm directly. I don't know if we should map this away in a distinct pass instead - seems a bit annoying because the Coroutine type could "leak" to various places. Maybe simply dealing with this in the generators is acceptable.

Simn avatar Feb 14 '24 13:02 Simn

Got the suspend thing working as well. This feels pretty hacky to do it like that, but I suppose it's not unexpected with these two different signatures going on.

@Aidan63 How did you do this for C++? I managed to include your Coroutine implementation but I can't seem to get the function right.

Simn avatar Feb 14 '24 15:02 Simn

Just gave this a try and it seems to work as it did before for my quick asys bodge, but without needing any of my additions. I added @:native('::hx::Coroutine::suspend') to the extern suspend function to have it call the function in the coroutine header, I just pushed the hxcpp changes to my hxcpp asys branch if you want to confirm things.

I agree that suspend is very confusing, everytime I think I understand it I look at it again and feel like I'm back at square one. It's very odd that it takes two arguments and then does...nothing? with them. Is there eventually going to be some advance functionality to it or is that purely to make it fit the "result and error" function signature.

I'm still seeing C++ errors with non static coroutines, stuff about __this again

image

I'll have to run the coroutine tests instead of just my quick asys bodge to see what else currently breaks.

Aidan63 avatar Feb 14 '24 19:02 Aidan63

@:native('::hx::Coroutine::suspend')

Hmm, that's what I tried as well but it generates this which the C++ compiler doesn't appreciate:

 ::Dynamic Coroutine_Impl__obj::_hx_f0xd5f8b494( ::Dynamic f, ::Dynamic cont) {
	return ::::hx::Coroutine::suspend(f,cont);
}

Simn avatar Feb 14 '24 19:02 Simn

That quad colon is suspicious, if @:native is working I wouldn't expect it to also prefix :: (don't know off the top of my head in what cases gencpp prefix ::), I also don't see any mention of the suspend function in the generated Coroutine_Impl__obj on my end.

I just tried compiling misc/coroutine for cpp and get another weird error, haxe::Exception has not been declared.

image

Looking at that header file it does not have a foward declaration of the exception type for some reason.

Aidan63 avatar Feb 14 '24 19:02 Aidan63

I can reproduce the Exception problem as well. Don't know yet what that is about, #11574 doesn't help with that either.

Another problem afterwards is this:

@:coroutine function error() {
	throw "nope";
}

function main() {
	error.start((result, error) -> {
		trace(error);
	});
}

This fails with 'result': illegal use of type 'void' from the generated void _hx_run(void result, ::Dynamic error){. I suppose it should infer Dynamic instead of Void in such cases, or maybe haxe.Unit once #11563 is done.

Simn avatar Feb 14 '24 20:02 Simn

Alright, good news: with the void problem out of the way and the Exception problem ignored, we can now successfully compile the coroutine tests to C++. CI should fail because it needs your Coroutine class, I guess we can make a hxcpp branch for that and use it here.

Could you tell me how to reproduce the __this problem? That's one I haven't seen yet.

Simn avatar Feb 14 '24 20:02 Simn

Hmm, interesting that it doesn't occur with the existing test suite. I initially assumed it was all non static functions but thats not the case. I haven't figured out a minimal reproduction yet, but the following code snippet causes it,

import haxe.NoData;
import haxe.coro.Coroutine;
import haxe.io.Bytes;
import asys.native.filesystem.File;
import asys.native.filesystem.FilePath;
import asys.native.filesystem.FileSystem;
import asys.native.filesystem.FileOpenFlag;

class CoroFileSystem {
	@:coroutine static public function openFile<T>(path:FilePath) : CoroFile {
		return Coroutine.suspend(cont -> {
			FileSystem.openFile(path, FileOpenFlag.Append, (file, error) -> {
				switch error {
					case null:
						cont(new CoroFile(file), null);
					case exn:
						cont(null, exn);
				}
			});
		});
	}
}

class CoroFile {
	public final file : FileAppend;

	public function new(file) {
		this.file = file;
	}

	@:coroutine public function write(text:String) : Int {
		return Coroutine.suspend(cont -> {
			file.write(Bytes.ofString(text), 0, text.length, (count, error) -> {
				switch error {
					case null:
						cont(count, null);
					case exn:
						cont(0, exn);
				}
			});
		});
	}
}

@:coroutine function listen() : String {
	final file = CoroFileSystem.openFile("test.txt");

	var count = 0;

	count += file.write("Hello, ");

	return file.file.path;
}

function main() {
	listen.start((name, error) -> {
		trace(name);
	});
}

My thinking regarding non static functions was that if you turn that write function into a static function which accepts CoroFile as its first argument it works.

__this appears in CoroFile.cpp

image

From what I recall with my recent typed function changes __this is defined in hx::LocalFunc which all closures classes inherit and it stores the object which defined the closure. For __this to not be defined it seems like it thinks its in a closure context but is not?

Aidan63 avatar Feb 14 '24 21:02 Aidan63

Do you have an asys + coro branch? I can't merge hxcpp_asys because git says something about unrelated histories.

Simn avatar Feb 14 '24 21:02 Simn

I've just been using my hxcpp asys branch (https://github.com/Aidan63/hxcpp/tree/asys) which has the coroutine header in it as well as adding a -p to src from this repo https://github.com/Aidan63/hxcpp_asys which contains all the haxe stuff.

Aidan63 avatar Feb 14 '24 21:02 Aidan63

Uhm right, I somehow thought hxcpp_asys was a Haxe branch. I can reproduce the __this problem now, thanks!

Simn avatar Feb 14 '24 21:02 Simn

class CoroFile {
	public final file:Dynamic;

	public function new(file) {
		this.file = file;
	}

	@:coroutine public function write() {
		function f() {
			trace(file);
		}
	}
}

function main() {
	var file = new CoroFile(null);
	file.write.start((_, _) -> {});
}

This is a bit dumb, the compiler normally generates var _gthis = this outside the closure, but the coroutine closure eats it and then we end up with this itself inside the closure, which is what this whole thing is trying to avoid.

Simn avatar Feb 14 '24 22:02 Simn

This works now. I realized that it was broken even in the case where this wasn't already captured, because it would be captured after the coroutine transformation. We now treat it as the special case that it is.

Let me know what else you come across that's broken! I'm happy to reproduce problems with hxcpp_asys. I'll also keep that Exception problem in mind, but it appears to be some rather particular special case.

Simn avatar Feb 15 '24 05:02 Simn

@RealyUniqueName Hi! I could use some advice for an exception-related problem I'm having here. Given a coroutine like this:

@:coroutine function tryCatch(e:haxe.Exception) {
	try {
		throw e;
	} catch (e:E1) {
		return "e1";
	} catch (e:E2) {
		return "e2";
	}
	return "none";
}

The transformed state-machine will have (among other things) these cases:

case 2: {
        if (Std.isOfType(_hx_error, _Main.E2)) _hx_state = 6 else if (Std.isOfType(_hx_error, _Main.E1)) _hx_state = 5 else throw _hx_error;
};
case 4: {
        throw _hx_error;
};

And the generated output is then this:

case 2:
        if(((_hx_error) instanceof _$Main_E2)) {
                _hx_state = 6;
        } else if(((_hx_error) instanceof _$Main_E1)) {
                _hx_state = 5;
        } else {
                throw haxe_Exception.thrown(_hx_error);
        }
        break;
case 4:
        throw haxe_Exception.thrown(_hx_error);

The problem appears to be that the throw is still a real throw and gets the wrapper treatment, but the catches are gone from the typed expression and so the _hx_error value is never unwrapped.

I'm unsure how to approach this. In #11574 I'm trying to run the exception changes immediately during typing, but that's not entirely trivial and currently causes some null-safety issues. It would be better to keep exceptions as a filter after all, but I struggle to find a good solution for this problem here. Any ideas?

Simn avatar Feb 15 '24 06:02 Simn

Figured out what was going on with the exception stuff late last night, a haxe.coro.Coroutine was making its way through to the generator and gencpp doesn't visit TAbstract parameters when figuring out what needs to be forward declared as they're usually all erased with Dynamic.

I added the following new function just below visit_type in gencpp line 4874.

      let is_coroutine_abstract a =
         match a.a_path with
         | (["haxe";"coro"],"Coroutine") -> true
         | _ -> false in

and added a special TAbstract case a few lines down.

         | TAbstract (a,params) when is_coroutine_abstract a ->
            add_extern_type (TAbstractDecl a);
            List.iter visit_type params;

with these changes the disabled exception tests compile and pass on cpp.

I see there's a comment in mk_suspending_call mentioning that it removes those Coroutine<T> so is the actual issue that they can still make their way though to the generator and the above is just a workaround for that?

Aidan63 avatar Feb 15 '24 11:02 Aidan63

Ah nice, thanks! I've added this to gencpp for now. Will have to review all the generator changes at the end anyway.

And yes, suspending calls are changed to lose the Coroutine type, but it's still going to be around for various types. This seems difficult to manage because with type inference it could appear in very many places, so mapping it away would be quite troublesome. It's easier if all generators respect it in places where it matters.

Perhaps we can change follow_with_abstracts to also expand the coroutine type. This is going to require some refactoring though because we need to get our hands on tvoid somehow.

Simn avatar Feb 15 '24 12:02 Simn

I've made some changes to the output I talked about in this comment. First of all, here's the mandatory dot-graph of the initial transformation:

graphviz

The second half of the converter now does two things:

  1. It skips next-blocks if a sub-block terminates (the 1 -> 0 edge).
  2. It follows through empty blocks which would only change the state and loop (blocks 4 and 6).

I have also inverted the do try to try do because in my mind that's more performant. We re-enter the loop from the exception state by recursing upon the function again.

The output now looks like this:

function(args:Array<mapCalls.TArg>, f:haxe.coro.Coroutine<mapCalls.TArg -> mapCalls.TRet>, _hx_continuation:(Array<mapCalls.TRet>, Dynamic) -> Void) {
        var ` = null;
        var ` = 0;
        var _hx_state = 2;
        var _hx_exceptionState = 10;
        var _hx_stateMachine;
        _hx_stateMachine = function(_hx_result:Dynamic, _hx_error:Dynamic) {
                if (_hx_error != null) _hx_state = _hx_exceptionState;
                try do (switch (_hx_state) {
                        case 2: {
                                _hx_state = 3;
                                `;
                                ` = [];
                        };
                        case 3: {
                                _hx_state = 5;
                                `;
                                ` = 0;
                        };
                        case 5: {
                                if (! (` < args.length)) _hx_state = 9 else _hx_state = 7;
                        };
                        case 7: {
                                _hx_state = 8;
                                var arg;
                                arg = args[`];
                                ++ `;
                                f(arg, _hx_stateMachine)(null, null);
                                return;
                        };
                        case 8: {
                                _hx_state = 5;
                                `.push(cast _hx_result);
                        };
                        case 9: {
                                _hx_state = -1;
                                _hx_continuation(`, null);
                                return;
                        };
                        case 10: {
                                throw _hx_error;
                        };
                        default: {
                                _hx_state = 10;
                                throw "Invalid coroutine state";
                        }
                }) while(true) catch (e:Dynamic) if (_hx_state == 10) {
                        _hx_exceptionState = 10;
                        _hx_continuation(null, e);
                        return;
                } else {
                        _hx_stateMachine(_hx_result, e);
                };
        };
        return _hx_stateMachine;
  }

And after the analyzer cleans it up a bit, the JS output is this:

function Main_mapCalls(args,f,_hx_continuation) {
	var _g = null;
	var _g1 = 0;
	var _hx_state = 2;
	var _hx_exceptionState = 10;
	var _hx_stateMachine = function(_hx_result,_hx_error) {
		if(_hx_error != null) {
			_hx_state = _hx_exceptionState;
		}
		try {
			while(true) switch(_hx_state) {
			case 2:
				_hx_state = 3;
				_g = [];
				break;
			case 3:
				_hx_state = 5;
				_g1 = 0;
				break;
			case 5:
				_hx_state = _g1 >= args.length ? 9 : 7;
				break;
			case 7:
				_hx_state = 8;
				var arg = args[_g1];
				_g1 += 1;
				(f(arg,_hx_stateMachine))(null,null);
				return;
			case 8:
				_hx_state = 5;
				_g.push(_hx_result);
				break;
			case 9:
				_hx_state = -1;
				_hx_continuation(_g,null);
				return;
			case 10:
				throw haxe_Exception.thrown(_hx_error);
			default:
				_hx_state = 10;
				throw haxe_Exception.thrown("Invalid coroutine state");
			}
		} catch( _g2 ) {
			var _g3 = haxe_Exception.caught(_g2).unwrap();
			if(_hx_state == 10) {
				_hx_exceptionState = 10;
				_hx_continuation(null,_g3);
				return;
			} else {
				_hx_stateMachine(_hx_result,_g3);
			}
		}
	};
	return _hx_stateMachine;
}

Simn avatar Feb 15 '24 14:02 Simn

I've been misunderstanding the exception problem this whole time because there were test classes which printed wrong names, which made me think that some instanceof checks were failing. The actual problem was that we didn't set our rethrow state before rethrowing an exception in case none of our catches match, which meant that we would rethrow exceptions eternally.

The test for testTryCatch_nested still fails. I'll have to figure out how this is supposed to work because setting the state to rethrow is not correct in such cases. I think there's already some sort of stack mechanism with exc_state_id_getter, but calling that doesn't give me the correct state either. This entire _hx_exceptionState business is giving me a headache.

Simn avatar Feb 16 '24 06:02 Simn

After giving it some thought I've come up with a slightly different approach to handling exceptions which I'll write down here before I start implementing it. Working with this as an example:

function tryCatch_nested(throwValue:Dynamic) {
	return sequence(@:coroutine.debug yield -> {
		dummy += '1';
		try {
			try {
				dummy += '2';
				throw throwValue;
				dummy += '3';
			} catch (e:Int) {
				dummy += '4';
				yield(10);
				dummy += '5';
			}
			dummy += '6';
		} catch (e:Dynamic) {
			dummy += '7';
			yield(20);
			dummy += '8';
		}
		dummy += '9';
	});
}

This is the CFG where I've added some "catch" edges:

graphviz (1)

We can do this:

  1. Each block gets associated with zero or one catch blocks, which is the innermost (set of) catch that might affect it. In the example, 7 is the catch block of 9, 3 is the catch block of 5, 6, 8 and (importantly) 7, and the other blocks have no catch block.
  2. Instead of having _hx_exceptionState, we generate a switch-expression in our big catch:Dynamic which looks something like this for the given example:
	switch (_hx_state) {
		case 9:
			_hx_state = 7;
			_hx_stateMachine(_hx_result, _hx_error);
		case 5 | 6 | 7 | 8:
			_hx_state = 3;
			_hx_stateMachine(_hx_result, _hx_error);
		case _:
			_hx_state = 11; // uncaught exception state
			_hx_continuation(_hx_error);
			return;
	}
  1. The logic for the catch blocks still does its instanceof checks. If there's no catch-all, and a catch block exists, it sets _hx_state = catch_block and just loops. If there is no catch block, it sets _hx_state to the uncaught exception state and loops as well.

This should cover all cases. I'm ignoring the multiple catch cases case because in the logic this will just become an additional else if. We should probably generate this block in the initial transformation already though so that the back-transformer doesn't have to fiddle with block IDs at all.

Simn avatar Feb 16 '24 07:02 Simn

Well, that worked:

function(yield:Yield<Int>, _hx_continuation:(Dynamic, Dynamic) -> Void) {
        var _hx_state = 1;
        var _hx_stateMachine;
        _hx_stateMachine = function(_hx_result:Dynamic, _hx_error:Dynamic) {
                if (_hx_error != null) _hx_state = 12;
                do (try switch (_hx_state) {
                        case 1: {
                                _hx_state = 6;
                                _Main.Main_Fields_.dummy += "1";
                        };
                        case 2: {
                                _hx_state = -1;
                                _Main.Main_Fields_.dummy += "9";
                                _hx_continuation(null, null);
                                return;
                        };
                        case 3: {
                                _hx_state = 4;
                                var e = _hx_error;
                                _Main.Main_Fields_.dummy += "7";
                                yield(20, _hx_stateMachine)(null, null);
                                return;
                        };
                        case 4: {
                                _hx_state = 2;
                                cast _hx_result;
                                _Main.Main_Fields_.dummy += "8";
                        };
                        case 5: {
                                _hx_state = 3;
                        };
                        case 6: {
                                _hx_state = 11;
                        };
                        case 7: {
                                _hx_state = 2;
                                _Main.Main_Fields_.dummy += "6";
                        };
                        case 8: {
                                _hx_state = 9;
                                var e = _hx_error;
                                _Main.Main_Fields_.dummy += "4";
                                yield(10, _hx_stateMachine)(null, null);
                                return;
                        };
                        case 9: {
                                _hx_state = 7;
                                cast _hx_result;
                                _Main.Main_Fields_.dummy += "5";
                        };
                        case 10: {
                                if (Std.isOfType(_hx_error, Int)) _hx_state = 8 else {
                                        _hx_state = 5;
                                };
                        };
                        case 11: {
                                _Main.Main_Fields_.dummy += "2";
                                throw throwValue;
                        };
                        case 12: {
                                throw _hx_error;
                        };
                        default: {
                                _hx_state = 12;
                                throw "Invalid coroutine state";
                        }
                } catch (e:Dynamic) switch (_hx_state) {
                        case 6, 8, 9, 7: {
                                _hx_state = 5;
                                _hx_error = e;
                        };
                        case 11: {
                                _hx_state = 10;
                                _hx_error = e;
                        };
                        default: {
                                _hx_state = 12;
                                _hx_continuation(null, e);
                                return;
                        }
                }) while(true);
        };
        return _hx_stateMachine;
  }

It generates way too many states now, but I don't want to deal with that until we have loads of tests. That should be easy to improve on by doing clever graph things.

I also had to go back to do try because we can't recurse on ourselves with a set _hx_error value after changing state, because the initial check would just set the state to the uncaught exception state again. It would be nice to do this a bit cleaner, but I don't yet know how to achieve that without goto-support.

Simn avatar Feb 16 '24 09:02 Simn

Here's another fun one for you, came across this yesterday while messing around with a yield iterable backed by the asys batched directory reading. I've since been able to re-produce this without asys on cpp and jvm.

With the below sample you'd expect to see "foo", "bar", and then "done" printed in the console but instead a null pointer error occurs.

import sys.thread.Thread;
import haxe.coro.Coroutine;
import yield.Yield;

class CoroDirectory {
    var sent : Bool;

    function new() {
        sent = false;
    }

    @:coroutine public static function open():CoroDirectory {
        return Coroutine.suspend(cont -> {
            Thread.current().events.run(() -> {
                cont(new CoroDirectory(), null);
            });
        });
    }

    @:coroutine public function next():Array<String> {
        return Coroutine.suspend(cont -> {
            if (sent) {
                Thread.current().events.run(() -> cont([], null));
            } else {
                sent = true;
                
                Thread.current().events.run(() -> cont([ "foo", "bar" ], null));
            }
        });
    }
}

@:build(yield.YieldMacro.build())
class Main {
    @:yield static function read_dir():Iterator<String> {
        final dir = CoroDirectory.open();

        var entries = [];
        do {
            entries = dir.next();
    
            for (entry in entries) {
                @:yield return entry;
            }
        }
        while (entries.length > 0);
    }
    
    @:coroutine static function start() {
        for (entry in read_dir()) {
            trace(entry);
        }
    }
    
    static function main() {
        start.start((result, error) -> {
            switch error {
                case null:
                    trace('done');
                case exn:
                    trace('error $exn');
            }
        });
    }
}

What I've figured out so far is that in case 2 of read_dir's state machine the result argument is null so dir is always null, hence the exception.

image

However, the open function callback never occurs, it seems to go straight from issuing the event loop delayed continuation call to state 2 which is why its never assigned.

The defered nature of things with Thread.current().events.run also appears to be important here, if you replace those with just the cont calls so everything happens immedately then it works fine and you see "foo", "bar", "done".

class CoroDirectory {
    var sent : Bool;

    function new() {
        sent = false;
    }

    @:coroutine public static function open():CoroDirectory {
        return Coroutine.suspend(cont -> {
            cont(new CoroDirectory(), null);
        });
    }

    @:coroutine public function next():Array<String> {
        return Coroutine.suspend(cont -> {
            if (sent) {
                cont([], null);
            } else {
                sent = true;
                
                cont([ "foo", "bar" ], null);
            }
        });
    }
}

Non defered CoroDirectory which works fine.

Aidan63 avatar Feb 16 '24 17:02 Aidan63

Thanks to @kLabz we now run our server tests using coroutines! For now there's a custom utest branch which we'll contribute to the main repository once everything has stabilized. This will be a good additional set of tests for when I start ~breaking~ optimizing everything.

@Aidan63 That looks like headache... My only immediate idea is that a calling state machine might not return after a suspending call, which would then continue to execute. I'll have to look at this in detail.

Simn avatar Feb 16 '24 19:02 Simn

Having stepped through things a bit more in VS I think the culprit is in the start function and its for loop over the generator. As you say its not actually waiting for any suspending calls and just immediately starts to execute the next states code which is how it ends up calling next on dir which has yet to be assigned by the defered event loop function.

Aidan63 avatar Feb 16 '24 21:02 Aidan63

At least part of the problem here is that read_dir isn't actually a @:coroutine, so there's never any suspension. But even with that it's still failing elsewhere.

Simn avatar Feb 16 '24 21:02 Simn

I think this comes down to us not understanding how Coroutine.suspend is actually supposed to work. From the Kotlin design document:

When suspending function suspends coroutine, it returns a special marker value of COROUTINE_SUSPENDED (see more in coroutine intrinsics section). When a suspending function does not suspend the current coroutine but continues the coroutine's execution, it returns its result or throws an exception directly.

The actual implementation of the suspending function is not allowed to invoke the continuation in its stack frame directly because that may lead to stack overflow on long-running coroutines. The suspendCoroutine function in the standard library hides this complexity from an application developer by tracking invocations of continuations and ensures conformance to the actual implementation contract of the suspending functions regardless of how and when the continuation is invoked.

We don't do anything like that COROUTINE_SUSPENDED thing, and we also simply call the continuation on the current stack instead of doing something fancy. I don't know if this is deliberate design or an oversight, and nadako probably forgot at this point.

Simn avatar Feb 17 '24 07:02 Simn

Here's a simplified version with some nostalgic trace debugging:

import sys.thread.Thread;
import haxe.coro.Coroutine;
import yield.Yield;

class CoroDirectory {
	function new() {}

	// @:coroutine public static function open():CoroDirectory {
	// 	trace("calling Coroutine.suspend");
	// 	return Coroutine.suspend(cont -> {
	// 		trace("running continuation");
	// 		Thread.current().events.run(() -> {
	// 			trace("BEGIN", "cont(new CoroDirectory(), null)");
	// 			cont(new CoroDirectory(), null);
	// 			trace("  END", "cont(new CoroDirectory(), null)");
	// 		});
	// 	});
	// }

	@:coroutine public static function open():CoroDirectory {
		trace("calling Coroutine.suspend");
		return Coroutine.suspend(cont -> {
			trace("running continuation");
			trace("BEGIN", "cont(new CoroDirectory(), null)");
			cont(new CoroDirectory(), null);
			trace("  END", "cont(new CoroDirectory(), null)");
		});
	}
}

class Main {
	@:coroutine static function read_dir():Iterator<String> {
		trace("read_dir");
		return sequence(yield -> {
			trace("BEGIN", "CoroDirectory.open()");
			final dir = CoroDirectory.open();
			trace("  END", "CoroDirectory.open()", dir);
			yield("" + dir);
		});
	}

	@:coroutine
	static function start() {
		trace("start");
		for (entry in read_dir()) {
			trace("returning", entry);
			return entry;
		}
		return "oops";
	}

	static function main() {
		start.start((result, error) -> {
			trace(result, error);
		});
	}
}

The version without Thread gives us this, which looks perfectly normal:

source/Main.hx:44: start
source/Main.hx:33: read_dir
source/Main.hx:35: BEGIN, CoroDirectory.open()
source/Main.hx:21: calling Coroutine.suspend
source/Main.hx:23: running continuation
source/Main.hx:24: BEGIN, cont(new CoroDirectory(), null)
source/Main.hx:37:   END, CoroDirectory.open(), haxe.root.CoroDirectory@506e1b77
source/Main.hx:26:   END, cont(new CoroDirectory(), null)
source/Main.hx:46: returning, haxe.root.CoroDirectory@506e1b77
source/Main.hx:54: haxe.root.CoroDirectory@506e1b77, null

The version with Thread:

source/Main.hx:44: start
source/Main.hx:33: read_dir
source/Main.hx:35: BEGIN, CoroDirectory.open()
source/Main.hx:9: calling Coroutine.suspend
source/Main.hx:11: running continuation
source/Main.hx:37:   END, CoroDirectory.open(), null
source/Main.hx:46: returning, null
source/Main.hx:54: null, null
source/Main.hx:13: BEGIN, cont(new CoroDirectory(), null)
source/Main.hx:15:   END, cont(new CoroDirectory(), null)

It's the same up to and including "running continuation", but then it returns. Which is not actually surprising because that Thread.current().events.run call just delays the callback and continues execution on the current stack. I can't even tell if this code is supposed to work like that, but if it should then surely Coroutine.suspend has to do something different. I don't know what though.

Simn avatar Feb 17 '24 08:02 Simn

Been messing around with modifying some of the coroutine dumps and comparing it to the decompiled output of kotlin. For those suspension markers would the state machine functions need to be updated to return a value? As a quick bodge maybe something like this.

enum SuspensionResult<T> {
    Suspended;
    Result(v:T);
    Error(e:Dynamic);
}

and a transformed coroutine function might look like this, where after the openDirectory call is returns Suspended.

public static function open(_hx_continuation:(CoroDirectory, Dynamic) -> Void) {
    var _hx_state = [1];
    var _hx_stateMachine = [];
    _hx_stateMachine[0] = function(_hx_result:Dynamic, _hx_error:Dynamic):SuspensionResult<CoroDirectory> {
        if ((_hx_error != null))
            _hx_state[0] = 3;
        while ((true)) {
            try {
                switch ((_hx_state[0])) {
                    case 1:
                        {
                            _hx_state[0] = 2;
                            asys.native.filesystem.FileSystem.openDirectory("C:\\Users\\AidanLee", 4, (dir, error) -> {
                                trace("opened");
                                if ((error == null)) {
                                    _hx_stateMachine[0](new CoroDirectory(dir), null);
                                } else {
                                    _hx_stateMachine[0](null, error);
                                }
                            });
                            return Suspended;
                        };

                        // Rest of function ommitted for brevity
                }
            }
        }
    };
    return _hx_stateMachine[0];
}

Anything calling a coroutine function would check the result and if its suspended return that value immediately, else set the result or error variables.

function(_hx_result:Dynamic, _hx_error:Dynamic):SuspensionResult<Dynamic> {
    if ((_hx_error != null))
        _hx_state[0] = 3;
    while ((true))
        try {
            switch ((_hx_state[0])) {
                case 1:
                    {
                        _hx_state[0] = 2;
                        switch CoroDirectory.open(_hx_stateMachine[0])(null, null) {
                            case Suspended:
                                return Suspended;
                            case Result(v):
                                _hx_result = v;
                            case Error(e):
                                _hx_error = e;
                        }
                    };
                case 2:
                    {
                        dir[0] = cast _hx_result;
                        _hx_state[0] = -1;
                        trace(dir[0]);
                        trace("world");
                        return Result(null);
                    };
                case 3:
                    throw _hx_error;
                default:
                    {
                        _hx_state[0] = 3;
                        throw "Invalid coroutine state";
                    }
            };
        } catch (_g:Dynamic) {
            _hx_state[0] = 3;
            return Error(_g);
        };
};

For non suspending cases the while true and setting the result or error allows the next state to execute immediately without growing the stack.

I still haven't groked the suspend function, My initial guess was that the compiler could transform something like this.

@:coroutine function open(path:String):CoroDirectory {
    return Coroutine.suspend(cont -> {
        FileSystem.openDirectory(path, 4, (dir, error) -> {
            switch error {
                case null:
                    cont(new CoroDirectory(dir), null);
                case exn:
                    cont(null, exn);
            }
        });
    });
}

into something like this

function open(path:String, _hx_continuation:(CoroDirectory, Dynamic) -> Void) {
    final _hx_result = [null];
    final _hx_error = null;

    return function(_hx_result, _hx_error) {
        FileSystem.openDirectory(path, 4, (dir, error) -> {
            switch error {
                case null:
                    _hx_continuation(_hx_result[0] = new CoroDirectory(dir), null);
                case exn:
                    _hx_continuation(null, _hx_error = exn);
            }
        });

        if (_hx_error != null) {
            return Error(_hx_error);
        }
        if (_hx_result[0] != null) {
            return Result(_hx_result[0]);
        }

        return Suspended;
    }
}

While this would appear to work for actual defered functions I'm not sure it does for immediate ones. if openDirectory doesn't actually do any work asynchronously it would immediately start execution of the next state and then at some point in the future execution would unwind to where it then returns the result or error value which doesn't seem right? Probably need to look at kotlins output some more to figure it out.

Aidan63 avatar Feb 18 '24 14:02 Aidan63

enum SuspensionResult<T> { Suspended; Result(v:T); Error(e:Dynamic); }

I had a very similar idea, except that I was thinking we could generalize _hx_error to carry this kind of "control state" and then hold the value in _hx_result even for the error state. Something along the lines of this in the place we're currently checking if (_hx_error != null) at the coro start:

		switch (_hx_error) {
			case Result:
				// Continue normally
			case Error:
				// set _hx_state to uncaught error state, treat _hx_result as error value
			case Suspended:
				// Call _hx_continuation (I think?) and return
		}

The main advantage is that we don't need to wrap any values, the combination of _hx_result as the value carrier and _hx_error (with a different name) as the control flag would give us all the data we need.

The one thing I absolutely cannot figure out is how we avoid leaking this kind of stuff to the user code, i.e. what the entry point start call would look like. It can't just call its continuation because then it could receive the Suspended state, no matter how we decide to represent that.

Simn avatar Feb 18 '24 14:02 Simn