fix(asynciterable): use more `yield`
I went through the async-iterable functions and swapped out custom while loops with yield where I could. Also took the liberty to clean up some of the code while at it. ~The benefit of yield* over custom code is that it takes care of returning the wrapped iterator, etc.~
As a consequence of https://github.com/microsoft/TypeScript/issues/61022, the following pattern fails for operators ( concat) implemented using yield* on targets that use <= ES2017 (passes for higher targets):
test('yield*', async () => {
let i = 0;
async function* asyncGenerator() {
i++;
yield 1;
}
const res = concat(asyncGenerator(), asyncGenerator()).pipe(take(1));
const items = await toArray(res);
expect(i).toBe(1); // Actually is 2 because loop continues
});
So even though yield* should be able to be used in most places, because of this, I went with loops and yield.
Something is strange.
If you run yarn test -t src locally, there will be one
● AsyncIterable#finalize calls with downstream error from flattening
expect(received).rejects.toThrow()
Received promise resolved instead of rejected
Resolved to value: {"done": false, "value": 2}
Maybe there's a race condition somewhere in FlattenConcurrentAsyncIterable (I haven't taken the time to understand its behemoth implementation). What confuses me is that if you then change the generator implementation in FinallyAsyncIterable like follows, the test passes.
I thought those two syntaxes are equivalent for all we care here, but one consistently fails and the right consistently passes.
Edit: Likely the TS issue.
I fixed the broken test by replacing flatMap with concatMap. Since output order is not defined when flattening async stuff concurrently, that's nothing the test should check.
Edit: Unless order is defined just fine for promises that all resolve at the same time, I don't know. When finalize is instead implemented with for await yield, it works like before. So perhaps it's the linked TS issue at fault. I've reverted the test back to flatMap.
Wow that TS issue is concerning. Is the addition of the returnAsyncIterators function related to that change? It looks to be materially different from the existing behavior, e.g. it doesn't await the it.return() Promises.
The bug just means that we can't use yield* and must instead manually iterate and yield individual items. For all we care here, both are functionally equivalent.
I'm not fully certain about what the best play with returning iterators is, but (thinking out loud here), if you look at for example timeout():
try {
while (1) {
const { type, value } = await safeRace<TimeoutOperation<TSource>>([
it.next().then((val) => {
return { type: VALUE_TYPE, value: val };
}),
sleep(this._dueTime, signal).then(() => {
return { type: ERROR_TYPE };
}),
]);
if (type === ERROR_TYPE) {
throw new TimeoutError();
}
if (!value || value.done) {
break;
}
yield value.value;
}
} finally {
await it?.return?.();
}
What we want in finally is not necessarily to return the iterator, but to abort it instead. The following test should pass, but currently doesn't. It hangs, and Jest eventually kills the run.
test('AsyncIterable#timeout with never', async () => {
const xs = never().pipe(timeout(1000));
const it = xs[Symbol.asyncIterator]();
await noNext(it);
});
The following snippet hopefully demonstrates why we can't await it.return() calls.
(async () => {
setTimeout(() => { }, 10e3); // Keep event loop running
async function* a() {
await new Promise(() => { });
}
const it = a();
void it.next();
await it.return();
console.log("Done")
})()
If you return a generator that's currently running (the unresolved it.next() is important. If you remove it, it works as expected), the it.return() promise hangs indefinitely.
To be fully correct when we race iterators, we should actually abort whichever don't finish the race (if you instead return them, you get the above scenario), and only the one that yielded first should eventually return. What such an implementation, we would not need https://github.com/ReactiveX/IxJS/pull/378 at all, since bufferCountOrTime would instead abort the interval.
The following test currently passes on master, even though it technically should not (with the current return implementation).
test('canceled', async () => {
let canceled = false;
async function* generate() {
try {
for (let i = 0; ; i++) {
await delay(100);
yield i;
}
} finally {
canceled = true;
}
}
const it = batch()(generate())[Symbol.asyncIterator]();
await delay(150);
expect(await it.next()).toEqual({ done: false, value: [0] });
expect(await it.return!()).toEqual({ done: true });
expect(canceled).toBe(true);
});
The test only succeeds because when you return, it awaits the remaining delay and eventually returns from the suspended position. If you crank up the delay and run something like the following instead, the test no longer passes.
test('canceled', async () => {
let canceled = false;
async function* generate() {
try {
for (let i = 0; ; i++) {
await delay(10000);
yield i;
}
} finally {
canceled = true;
}
}
const it = batch()(generate())[Symbol.asyncIterator]();
expect(await it.return!()).toEqual({ done: true });
expect(canceled).toBe(true);
});
This test would pass if and only if we instead aborted the delay when the buffered iterator is returned.
For that reason, I have removed tests like these that rely on await it.return() to continue execution until the next yield is reached.
In conclusion, I think, there is no scenario in which we can blindly await it!.return?.(); and expect it to work. We would always need to keep track of whether any generators are currently suspended or executing, and either return or abort accordingly. For now, however, void it!.return?.(); has to do, I would say.
From the linked TS issue, it seems the fix was targeted for the TS v5.8.0 milestone. Maybe we should try updating the TS version to see if this works now?
I am not happy with the closure compiler bug causing the UMD builds to fail... I'm inclined to migrate away from closure compiler for optimization/minification.
What we want in finally is not necessarily to return the iterator, but to abort it instead. ... If you return a generator that's currently running (the unresolved it.next() is important. If you remove it, it works as expected), the it.return() promise hangs indefinitely. ... To be fully correct when we race iterators, we should actually abort whichever don't finish the race (if you instead return them, you get the above scenario), and only the one that yielded first should eventually return.
If I understand correctly, you want an AsyncIterator that tracks each Promise returned by next(), and aborts them prematurely if the user calls throw() or return()? That sounds like a fundamental change to the AsyncIterator protocol in JS (which to be clear -- I would support), but that ship has sailed.
We could use an existing (or implement our own) cancelable Promise type. That would essentially bring it in line with Task, which would more closely align with the original .NET implementation.
Unfortunately this would not compose, as there's many occasions for users to implicitly drop out of the Ix ecosystem and revert to using native Promises instead. Even a simple Ix.from(async function*() {}) would give someone the opportunity to do what you've done here. I suppose we could still wrap their Promises in something that still respects the AbortSignal (and turns a blind eye to whether the underlying Promise ever returns or rejects), but that sounds inefficient and gross.
From the linked TS issue, it seems the fix was targeted for the TS v5.8.0 milestone. Maybe we should try updating the TS version to see if this works now?
Doesn't seem to be fixed yet, unfortunately: playground