No means for secure stopping async generators
Let's assume a very typical reactive programming use case, say, I need to combine a few streams into a single one, yielding value each time any of original streams yields new value.
For example, I use it for UI and convert DOM events into an async iterable with a function subject (I don't include its sources here, it is not significant for the problem - it may be checked in this gist). The function returns an object with AsyncIterable interface and additional method send to pass the event into the stream.
And there is a function combine simply merging all async iterables in arguments into a single one.
async function* combine(...args) {
const threads = args.map(i => i[Symbol.asyncIterator]())
const sparks = new Set(threads.map(i => ({thread:i,step:i.next()})))
try {
while(sparks.size) {
const v = await Promise.race([...sparks]
.map(i => i.step.then(({done,value}) => ({done,value,spark:i}))))
sparks.delete(v.spark)
if (!v.done) {
sparks.add({...v.spark,step:v.spark.thread.next()})
yield v.value
}
}
} finally {
await Promise.all([...threads].map((i) => i.return()))
}
}
And now I iterate two such sources combined and exit the loop on some condition.
const subj1 = subject()
const subj2 = subject()
async function test() {
let cnt = 0
for await(const i of combine(subj1,subj2)) {
console.log("value:",i)
if (cnt++)
break
}
}
//.....
subj1.send(1)
subj2.send(2)
Everything is pretty fine, I added console.log in exit function in subject all loops to handle subjects exit. But now if I wrap one of the sources with any simple async generator, like with (or just yield*)
async function* copy(input) {
for await(const i of input)
yield i
}
async function test() {
let cnt = 0
for await(const i of combine(subj1,copy(subj2))) {
console.log("value:",i)
if (cnt++)
break
}
}
Now test never exits (never calls next then), it waits in await in the combine's finally block. The reason is pretty simple and obvious, the copy generator is blocked on next and generators use the same queue for return.
I searched issues in this repository and found these tickets: #5 and #55. There are no clean reasons for the single queue decision, except probably:
writer.next(bufferA);
writer.next(bufferB);
await writer.return(); // Only wait for the file to close
This is maybe better to be done this way:
await Promise.all([writer.next(bufferA), writer.next(bufferB)]);
await writer.return(); // Only wait for the file to close
And how would we convert async generator into observable? For example, the solution from #20 won't work.
function toObservable(asyncIterable) {
return new Observable(observer => {
let stop = false;
async function drain() {
for await (let x of asyncIterable) {
if (stop) break;
observer.next(x);
}
}
drain().then(x => observer.complete(x), err => observer.error(err));
return _=> { stop = true; }
});
}
The cleanup function won't stop the source generator (even if we add return call for object returned by drain) if there are no more values sent by original iterable, it is forever locked in next.
Current status
There is nothing to do in this proposal to fix this. This will be fixed automatically after some form of async cancelation operation is added as part of some another proposal. This issue is left open as a warning about the problem.
Workarounds
Using Transducers (lifted generators)
Transducers are functions taking a stream as input and producing another stream.
Generators can read its input passed as an argument and it is there we can send some specific stop signal if needed.
See more detailed description here:
- Decouple Business Logic using Async Generators
- Async Generators as an alternative to State Management
There is fork function which merges streams and stops well, but it merges lifted streams - transducers
Transpiler
@effetful/es transpiler adds lightweight cancelation support to async function.
Resulting Promises of async functions transpiled by this are amended with M.cancelSymbol which is a function which tries to cancel execution of this function. It executes finally blocks, and propagates cancelation to current await expression. It doesn't do anything more (e.g. no propagation to children).
I'm not even sure what combineLatest would mean in a pull-based model, would you expect it to attempt to pull from all observables when it is being pulled from?
Let's assume a very typical reactive programming use case, say, I need to combine a few streams into a single one, yielding value each time any of original streams yields new value.
That is, that is not the directionality of async iterators - if next is called on the "combined" iterator - whom does it ask fro the value? Does it consume values eagerly from the parents?
@benjamingr
I'm not even sure what combineLatest would mean in a pull-based model, would you expect it to attempt to pull from all observables when it is being pulled from?
regardless it is combineLatest or some other combination of more than one source streams, sooner or later we'll need some of it, yes, I will need to pull from all of them if the caller is pulled
That is, that is not the directionality of async iterators - if next is called on the "combined" iterator - whom does it ask fro the value? Does it consume values eagerly from the parents?
Not eagerly, when first next called it awaits on Promise.race of all of the sources next. After the race is resolved its value is the result of the caller's next. The other promises (not winning the race) are stored somewhere and on next caller's next they will participate in the next race again, along with the next next of the previously signaled item.
And so, we cannot exit the loop from the caller, and the only problem here is the queue for next/throw/return execution. Say, we have something like this:
const src = {...}
async function* h1() {
for await(const i of src) {
yield some(i)
}
}
async function* caller() {
for await (const i of merge(h1(), somethingElse)) {
if (someCondition)
break
}
}
and the simplified transpilation of for-await:
const src = {...}
async function* h1() {
const loop = src[Symbol.asyncIterator]()
try {
for(let item;!(item = await loop.next()).done;) {
yield some(item.value)
}
} finally {
await src.return()
}
}
async function* caller() {
const loop = merge(h1(), somethingElse)[Symbol.asyncIterator]()
try {
for(let item; !(item = await loop.next()).done;) {
if (someCondition)
break
}
} finally {
await loop.return()
}
}
Say when the control encounters break there it is because somethingElse.next was resolved in Promise.race, and the h1().next still sits in next (maybe it will never be resolved).
If we wouldn't have the queue the break in caller would immediately call return in merge. In merge it would call all the return of its arguments. That would invoke return continuation in h1 (no needs to cancel the currently pending next Promise, only mark its continuation as canceled). And the h1 finally block will call return of src. It is not a generator and it may know how to handle it properly. Say, it will resolve the promise to some dummy value, and since the continuation is marked as canceled everyone is happy.
With the queue the return call of h1 may stay in the queue forever, leaking a few objects.
I have the same issue! I want to write an iterator adapter that makes an async iterator cancelable.
async function* iterateStream(stream) {
let ended = false, error, wake = () => {};
stream.on("readable", wake)
.on("end", () => { ended = true; wake(); })
.on("error", (err) => { error = err; wake(); });
for (;;) {
for (let chunk = stream.read(); chunk; chunk = stream.read()) {
yield chunk;
}
if (error) { throw error; }
if (ended) { return; }
await new Promise(resolve => wake = resolve);
}
}
async function* withTimeout(timeoutMs, iter) {
let timer;
try {
const timeout = new Promise((_, reject) => {
timer = setTimeout(reject, timeoutMs, new Error("Timeout!"));
});
for (;;) {
const result = await Promise.race([timeout, iter.next()]);
if (result.done) { break; }
yield result.value;
}
}
finally {
clearTimeout(timer);
if (iter.return) { await iter.return(); }
}
}
async function main() {
const socket = net.connect(80, "httpbin.org");
try {
for await (const val of withTimeout(1000, iterateStream(socket))) {
console.log("[main] got value %s", val);
}
}
finally {
socket.end();
}
}
This also gets stuck in await iter.return() and can only exit the loop when the socket closes. And because the socket is closed outside the loop, this results in a deadlock, which is only resolved when the socket times out...
It seems to me that
- An important use-case for async iteration involves iterators that can block for a significant amount of time, maybe even infinitely. Think reading from a socket, waiting for user input, etc.
- Async generators can not be used for implementing async iterators in such cases because if the generator is stuck in a long
awaitthere's no way anymore to break from the loop, raise an exception, etc. Control flow will have to wait until this await is finished...
If I understand all this correctly, this seems very unfortunate, since async generators otherwise would make implementing async iterators very easy!
There are a couple of things going on here.
First, I notice that in all the provided examples we are awaiting a never-resolving promise inside of a finally block. Consider what happens in the case of "regular" async functions:
async function f() {
try {
throw new Error('oops');
} finally {
console.log('finally f');
// Never-resolving
await new Promise(() => {});
}
}
async function g() {
try {
await f();
} finally {
// This is never executed
console.log('finally g');
}
}
g();
The lesson here is: be careful about putting an await in a finally block, regardless of whether we're dealing with an async function or async generator function.
The other thing I see is that we are assuming that return (with finally) is sufficient for implementing async cancellation. Although it would be nice if that were the case, we need to think of return as cleanup, not cancellation.
As we see in these examples, the cancellation signal needs to propagate through async generator functions (and async functions), into their async subroutines. Because async/await linearizes async computations, we can't use return as a secondary, parallel message channel. We need another channel to propagate that cancellation signal.
The DOM has invented AbortController for this purpose. Although AbortController is a little too DOM-centric to standardize at the language level, I'm hopeful that we can move forward with a standard "cancel token" protocol that AbortController could implement.
Reminds me of https://github.com/tc39/ecmascript-asyncawait/issues/51 when I pondered if finally in await should even be allowed.
@zenparsing
First, I notice that in all the provided examples we are awaiting a never-resolving promise inside of a finally block.
No, at least in my example, finally block is never executed and this is why it is a problem.
Consider what happens in the case of "regular" async functions: The lesson here is: be careful about putting an await in a
finallyblock, regardless of whether we're dealing with an async function or async generator function.
I see absolutely no problem in this example, someone could put never resolving promise in await in a usual block without finally, and control would never reach anything after.
The other thing I see is that we are assuming that return (with finally) is sufficient for implementing async cancellation.
No, I highlighted it in the message, Promise cancelation is not required if you mean that by async calculation.
Although it would be nice if that were the case, we need to think of return as cleanup, not a cancellation.
Let's keep it practical, without resorting to philosophical terms interpretation. I might as well call it - clean generator object.
As we see in these examples, the cancellation signal needs to propagate through async generator functions (and async functions), into their async subroutines. Because async/await linearizes async computations, we can't use return as a secondary, parallel message channel. We need another channel to propagate that cancellation signal.
I don't get this. What is not linear? Everything is kept linear without the queue in the generator scope as well, outside the scope, everything is async as usual. I see no problem again. What is another channel? Another channel means no async generators and implementing async iterators protocol?
The DOM has invented AbortController for this purpose. Although AbortController is a little too DOM-centric to standardize at the language level, I'm hopeful that we can move forward with a standard "cancel token" protocol that AbortController could implement.
This won't work, there is no another channel, unless all async iterators may interpret the token themselves in their code. But that just means - no async generators. There is no way to handle any token if control is resting in await.
Also @zenparsing in the first message, I actually copied your implementation of converting async generator to observable - toObservable. It doesn't work because of the same reasons. Do you confirm now, you were wrong and it should not be possible to convert async generators into observable?
@benjamingr
Reminds me of tc39/ecmascript-asyncawait#51 when I pondered if finally in await should even be allowed.
In my message, it is some possible transpilation result, not the finally actual usage, await in finally. Though I don't see a problem even if it is the usage. The finally block there is a solution, not a problem. But it doesn't work only because there is the queue. And there are no obvious reasons for it.
I think the queue as it currently exists should remain, but I still think it would be worth reviving some form of syntactic cancellation mechanism to make this sort've thing easier (what is one to do with for-await-of loops for example if there's no syntactic mechanism).
I don't really see how the cancellation proposal could solve this issue in it's current form, unless it was decided that .next should be sent the cancellation token (in which case to use async generators to write anything we essentialy require function.sent for the part before the first yield).
Even sending the cancellation token forward with .next it still means for-await-of is un-usable if you want to be able to cancel a request, e.g. you'd have to write it using iterators directly e.g.:
function map(asyncIterable, iteratee) {
return {
async* [Symbol.asyncIterator]() {
const iterator = getIterator(asyncIterable)
try {
while (true) {
const cancelToken = function.sent;
const value = iterator.next(cancelToken);
yield iteratee(value);
}
} finally {
await iterator.return()
}
}
}
}
This seems quite bad compared to the naive implementation of map that just loops through the items and yields iteratee(value) in a for-await-of loop. But frankly I don't see any other way it could work with the current cancellation proposal.
@Jamesernator
I think the queue as it currently exists should remain,
but why? I mean, I've, of course, read the motivation like to make something running in parallel with Promise.all or dangling promises, but as @zenparsing wrote Async Generators linearize their control flow. There is no way (and shouldn't be) to run different parts of a single async generator simultaneously.
While of course, something implementing Async Iterator protocol but not Generator could do this. But the thing may handle the order itself in whatever way it likes. It may be a queue, it may be a priority queue it may ignore something etc.
It is also easy to write a wrapper to add a queue to any other Async Iterator, e.g. queue(something) if someone really needs it. But if it is as it is now - it is an unnecessary complication for nothing. And you propose to complicate it even more with some cancelation protocol. While the cancelation isn't required here. We, of course, need to cancel a continuation after await where the generator is suspended. But this effect is visible only inside the same single generator. It is not related to some async operation cancelation.
There is no way (and shouldn't be) to run different parts of a single async generator simultaneously.
That's not true for arbitrary async iterators though
I would love to see an implementation of combineLatest with the semantics proposed by @awto though - I looked at all my implementations from when I gave a talk and they suffer from this problem.
I haven't had proper time to think about it - but I definitely think it's worth solving and the spec is worth amending if it can't be.
@benjamingr
There is no way (and shouldn't be) to run different parts of a single async generator simultaneously.
That's not true for arbitrary async iterators though
indeed, but there is no any queue for arbitrary async iterators anyway, the iterator itself can decide how to handle the requests
Yes but it's because other types of async iterators exist that generators need to queue. If a consumer consumes two values eagerly and then invokes .return an async iterator only knows that the requests are done not the existing nexts should be cancelled.
e.g. One might implement a preloading version of take that consumes values eagerly instead of as requested e.g.:
function eagerTakeN(asyncIterable, n) {
async* [Symbol.asyncIterator]() {
const iterator = getIterator(asyncIterable);
// Preload values as fast as possible into buffers of a given size
const buffer = []
for (let i = 0 ; i < n ; i++) {
buffer.push(iterator.next())
}
// Allow the iterator to stop queuing now and cleanup when *it*
// wants to
const done = iterator.return();
const results = await Promise.all(buffer);
for (const { done, value } of buffer) {
if (done) {
return value;
}
yield value;
}
await done;
}
}
Note that if you know the iterables are generators it makes no difference to consume multiple concurrently, but if it's an arbitrary iterable it might be better to consume earlier. .return says that no more values will be requested but to fufill the requests that have already been requested. Because of the this the most sensible option for an async generator is just to queue as a consumer like eagerTakeN shouldn't need to know the type of the iterator.
If you want to cancel the .next Promises then there should be a mechanism to say I longer care about those specifically, in the case of an async generator I would definitely expect Promise cancellation to close an async generator though on cancelling a Promise returned from a .next.
@Jamesernator The same may be solved using explicit queue wrapper. I would rather have merging working rather than the implicit queue. And as another advantage of the queue is explicit is we can use some smarter application-specific scheduling algorithms.
The thing is cancellation allows you to have both which I why I want to see a better cancellation proposal than just a builtin CancelToken type or the like.
What you're proposing effectively requires that await is a potential interruption point which I think would be surprising given that it gives a producer no assurances that if they perform some work between yields that it will ever be executed.
e.g. Consider this that needs to free a lock immediately after producing a value:
async function* readFromDatabase() {
const cursor = db.getCursor('someTable');
while (true) {
// This await could be interrupted
const dbLock = await db.acquireLock();
const value = cursor.nextValue();
dbLock.unlock();
yield value;
}
}
But from what I can tell under your proposed change if it were to be cancelled via .return while stopped on db.acquireLock() then db.acquireLock() would proceed as per normal but the generator would be terminated and now the lock would never be freed as the generator was terminated in the middle of an operation.
@Jamesernator
fair point, I don't really mind much about the cancelation, just wanted something simpler. Still, the solution to this problem is possible, it can be some db context where we can signal the lock isn't longer required in finally. Or just implement the read logic in separate not-generator function. Or, again an explicit wrapper - queue(async function* readFromDatabase(){....}). It is indeed complex and not intuitive, but possible. While merging generators isn't possible now at all.
actually, the cancelation proposal will indeed solve the problem faster than something can be changed in this spec, so I'm closing the issue. Thanks, everyone, esp. @Jamesernator.
in case if someone needs this urgently, you can use my transpiler - @effectful/es (sorry for advertisement). In abstract interface mode, you can extend the interface with any functionality you want including cancelation or no queue or anything else.
@awto what cancellation proposal? :D
@benjamingr cancellation proposal - I suppose it is the thing to solve the problem somehow in some future, there are not many details now, and it will happen apparently not soon. For my task, I'm doing a workaround with my transpiler for now.
there are libraries emerging with (not-safe) iterator's combinations, so I think it is rather worth keeping this issue open
Maybe talk to the people from https://github.com/jamiemccrindle/axax @jamiemccrindle
Does anyone have a simple test case that demonstrates this issue?
@jamiemccrindle I would at least add a warning on your page, axax is still usable if the sources are managed, but as a user, I would like to be aware of possible problems, e.g.
const s1 = new Subject()
const s2 = new Subject()
async function* something() {
try {
for await(const i of s2.iterator)
yield i
} finally {
console.log("never called")
}
}
async function test() {
for await(const i of merge(s1.iterator,something())) {
break
}
}
test()
s1.onNext(1)
here is the demonstration of the leak with merge if s2 never touched, something instance will leak. It may be fixed if the program manages references but it is not required with, say, Observables.
Thanks for the example @awto. I'll add a note to axax.
Would you say that this is comparable to a Promise that never returns. Something like:
async function example() {
await new Promise((resolve, reject) => {});
}
example.then(() => console.log('never gets here'));
I don't think so, there is still a way to finish the chain, e.g. by calling s2.onCompleted(), but no way to stop it from another end because the generator there sits in next and there is a queue requirement, so return just needs to await next to exit. But the queue cannot be removed because it will introduce another resource management problems (covered in this thread).
Thanks! I've added a note on the axax page.
Would you say that this is comparable to a Promise that never returns.
It is. A consumer is waiting on the resolution of a promise for "next" but that promise is never resolved. In this case, it is because the "producer" has not called "s2.onNext". Be mindful of this when attempting to port Observable-style programming patterns to async iteration.
Actually, I think I misunderstood something about the example. I'll defer to @awto, who has a pretty good handle on this.