streams icon indicating copy to clipboard operation
streams copied to clipboard

ReadableStream.from(asyncIterable)

Open MattiasBuelens opened this issue 5 years ago • 36 comments
trafficstars

I want to give #1018 a try. 😁

This adds a static ReadableStream.from() method, which takes an async iterable and returns a ReadableStream pulling chunks from that async iterable. Sync iterables (including arrays and generators) are also supported, since GetIterator() already has all the necessary handling to adapt a sync iterator into an async iterator. So that's nice. 😄

I think it should be fairly easy to add ReadableStream.of(...chunks) as well, which would simply call through to ReadableStream.from(chunks). I'll look into it.

This is a very early draft, so there are no tests yet. I first want to get most of the semantics done, then we can worry about all the nasty error handling and edge cases.

  • [x] At least two implementers are interested (and none opposed):
    • Chrome (@ricea, https://github.com/whatwg/streams/pull/1083#issuecomment-820205616)
    • Firefox (@mgaudet, https://github.com/whatwg/streams/pull/1083#issuecomment-1145991409)
  • [x] Tests are written and can be reviewed and commented upon at:
    • web-platform-tests/wpt#27009
  • [ ] Implementation bugs are filed:
    • Chrome: …
    • Firefox: …
    • Safari: …

(See WHATWG Working Mode: Changes for more details.)


Preview | Diff

MattiasBuelens avatar Nov 10 '20 21:11 MattiasBuelens

I've moved the implementation to a new abstract op ReadableStreamFromIterable. Unfortunately, this marks most of the review comments as outdated. Sorry about that. 😞

~I also added the suggested ReadableStream.of(...chunks) method, since it's almost trivial now.~

I think I'll start working on some tests next. 🙂

MattiasBuelens avatar Nov 19 '20 22:11 MattiasBuelens

I can't think of any more interesting tests to add, so I'm putting this up for review. 🙂

Quick recap of the discussions so far:

  • An optional queuingStrategy argument is currently left as a future extension. However, Domenic is concerned that implementers might not be motivated to add a convenience method if they already know that they might have to revisit it in the future.
  • The high water mark of the stream is currently set to 0 (instead of the default value of 1). The question remains whether we should keep it at 0 (to align with TransformStream.readable) or revert it to 1 (to restore the default).

MattiasBuelens avatar Mar 19 '21 22:03 MattiasBuelens

Chrome is interested.

ricea avatar Apr 15 '21 07:04 ricea

I realised that for synchronous iterators we could get optimum performance if we just enqueued everything in startAlgorithm. It's probably not worth the extra complexity, and I guess an iterator could return an infinite stream, which would be bad. What do you think?

ricea avatar Apr 19 '21 06:04 ricea

I think we should only do that if we are absolutely sure that the user-visible behavior is exactly the same. So we shouldn't do this with arbitrary synchronous iterators, since they can be infinite (as you correctly noted) or they might be modifying some external state inside next() (such as variables or synchronous I/O).

That doesn't mean we can't allow some optimizations. For example, if the input is an Array with its original [Symbol.iterator] method, then the implementation can indeed use a "fast path" for such a case. The same logic could apply to other built-in iterables like Set or Map or Uint8Array (although I don't know if these are worth optimizing).

MattiasBuelens avatar Apr 19 '21 13:04 MattiasBuelens

Thanks for the feedback. I have a suspicion that it's detectable, but I don't know how.

I'm ready to land this if you are.

ricea avatar Apr 19 '21 13:04 ricea

Good to go. 🙂

MattiasBuelens avatar Apr 19 '21 14:04 MattiasBuelens

Ah, wait, we need interest from another implementer. @youennf, @evilpie, any thoughts?

ricea avatar Apr 19 '21 14:04 ricea

Uhoh... It looks like optimizing for arrays might not even work. 😕

You can push() new elements to an array while you're iterating it:

let array = ['a', 'b'];
const it = array[Symbol.iterator]();

it.next(); // { value: 'a', done: false }
it.next(); // { value: 'b', done: false }

array.push('c');

it.next(); // { value: 'c', done: false }
it.next(); // { value: undefined, done: true }

So by that logic, this should also work:

let array = ['a', 'b'];
const rs = ReadableStream.from(array);
const reader = rs.getReader();

await reader.read(); // { value: 'a', done: false }
await reader.read(); // { value: 'b', done: false }

array.push('c');

await reader.read(); // { value: 'c', done: false }
await reader.read(); // { value: undefined, done: true }

With the current implementation, this does indeed work. ReadableStream.from() will call it.next() only when a new chunk is needed (since HWM = 0), so it will still see the newly added element.

However, if we add a special case for arrays that immediately iterates over the array and enqueues all its elements, then the above snippet would no longer work.

How do we feel about this? Should we allow this, and make this optimization impossible? (If so, I'll add a test for the above.) Or not?

MattiasBuelens avatar Jul 08 '21 21:07 MattiasBuelens

That's really interesting.

I don't like either option, but I'm leaning towards special-casing arrays.

My reasoning is that I want people to use the ReadableStream.from([chunk]).pipeTo(writable) pattern, but if it performs poorly then they either won't use it or they'll have a bad time.

ricea avatar Jul 09 '21 02:07 ricea

I'm doubtful that special-casing arrays would give serious performance benefits, and so I'd prefer we keep things generic (with uniform observable behavior) unless we have some performance data that it'd be problematic. I think we're talking about avoiding a couple microtasks, and some { value, done } wrapping/unwrapping, which doesn't seem like it'd be significant.

domenic avatar Jul 09 '21 17:07 domenic

It depends on your definition of "serious performance benefits".

I went away and implemented it, and in a microbenchmark

ReadableStream.from([chunk]).pipeTo(writable);

takes 83μs with the array optimisation and 139μs without it.

Obviously with a larger array the performance difference would be much more striking, but I mostly care about the performance of the single-chunk case.

Unfortunately the presence of thenables in the array complicates things. I didn't go to the trouble of matching the behaviour that the ES standard gives to them. It would require falling back to an async algorithm the first time we see a thenable. The performance impact would not be significant in the normal case, but the implementation would be messy. I don't actually like the behaviour of following the thenable, so I'd prefer to ignore the problem, but I don't think we can.

My draft CL is at https://chromium-review.googlesource.com/c/chromium/src/+/3023854.

By the way @MattiasBuelens this is some amazing specification work. I didn't find a single issue while implementing.

ricea avatar Jul 13 '21 20:07 ricea

I think the optimization is even trickier :(. Checking using V8's IsArray() doesn't guarantee that the optimization is nonobservable, because someone could have messed with Array.prototype to insert a getter for 0, or override the getter for [Symbol.iterator], or something like that. I suspect V8 has ways of detecting such problems so that they can deopt arrays generally, but I doubt they're part of the public API, and they're probably intertwined with the VM pipeline instead of something that could be easily exposed.

Having actual data is great. I'm unsure how to interpret the results... I mean, nanoseconds are very small. But a 1.67x overhead feels large. I'd be interested in @syg's perspective (or that of his colleagues) as to whether this sort of thing feels like something we could optimize if needed in the future, or whether it's fundamentally going to be about 1.67x more work to do the whole async-to-sync-iterator dance vs. just iterating an array.

At a conceptual level, one angle is to wonder what ReadableStream.from() is supposed to mean. Some of the discussions in #1018 indicated it would be a general-purpose "feed me anything" API. Notably, the original idea had some behavior for blobs, and we discussed special-casing ReadableStream.from(uint8Array) or ReadableStream.from(string)---even if just to throw to avoid people getting a bunch of single-byte/code point chunks. But the current spec draft here leans hard into "treat the argument always as an async iterable". That has some elegance, but perhaps bad consequences for the string and Uint8Array case.

The relevance of the above paragraph is: if we decide to special-case more types, then special-casing arrays seems natural.

Also, I'm very excited that you created a draft CL!!

domenic avatar Jul 15 '21 21:07 domenic

Special casing arrays vs other iterables seems wrong to me. Is there no room for just optimizing array iterables more? I know SpiderMonkey has a specific optimization for iterating over arrays. (Please note that I don't want to make any performance claims here, this might well still be slower than Chrome anyway)

evilpie avatar Jul 15 '21 21:07 evilpie

Is there no room for just optimizing array iterables more? I know SpiderMonkey has a specific optimization for iterating over arrays.

It looks like ForOfIterator is only meant to imitate the behavior of a for..of loop? For ReadableStream.from(), we need the behavior of a for await..of loop instead, so you'd still have to wrap it with CreateAsyncFromSyncIterator.


I don't think it's a valid optimization to exhaustively iterate an array. For such optimization to be unobservable, you'd have to prove that the array can never be mutated after constructing the stream, and that all of its elements are non-thenables (so they won't trigger a delay when using it.next()). This seems like too complex of a condition to check at runtime.

If I understand the code from the CL correctly, you already have special handling for synchronous iterators in ScriptAsyncIterator. Perhaps it can also optimize for the common case where it.next().value is a non-thenable, and thus AsyncFromSyncIteratorContinuation doesn't need to attach an UnwrapFunction? Maybe that can save a microtask? Or is the overhead not coming from using too many microtasks?

MattiasBuelens avatar Jul 15 '21 22:07 MattiasBuelens

To be clear, I was only trying to evaluate the performance of treating Array as a special case, I was not trying to make the behaviour match the async iterator case.

Having thought about it a bit more, making Array a special case is not really justifiable since as @domenic noted it gets weird if someone has tampered with the object or the prototype.

So I've experimented with making sync iterators a special case instead. As with the Array case, I enqueue everything synchronously when the stream is created. And as with the Array case, this means it will fill up your memory and crash if you put an infinite iterator in there.

The performance is actually weirdly good. I'm getting a median of 82μs, which is within the margin of error of the Array special case. I can't explain why it's so fast, but I'm not complaining.

Here's what I'm proposing:

  1. Check if asyncIterable has an @@iterator method. If it does then call GetIterator(asyncIterable, sync) and synchronously enqueue all the values in the startAlgorithm.
  2. Otherwise, call GetIterator(asyncIterable, async) and lazily pull items as before.
  3. Tell people that if they want to use an infinite iterator with from() they have to make it async.

ricea avatar Jul 15 '21 23:07 ricea

For reference, here's the benchmark I'm using: https://gist.github.com/ricea/ca766d90ba985dfb7e1cd54bba241565

If you want to run it, don't open the console until the benchmark is over or it will mess up the results. Also, you need to divide by 10000 in your head to get the per-iteration time.

ricea avatar Jul 15 '21 23:07 ricea

3. Tell people that if they want to use an infinite iterator with from() they have to make it async.

If all they have is an infinite synchronous iterable, they might find it difficult to turn it into an asynchronous iterable. Effectively, they'd have to resort to things like this:

ReadableStream.from(async function* () {
  return yield* syncIterable;
}());

I don't know if we want to encourage the use of this sort of pattern. But then again, I also don't know how often people will actually want to use an infinite synchronous iterable. 🤷

MattiasBuelens avatar Jul 15 '21 23:07 MattiasBuelens

Hmm, I haven't thought about the tradeoffs in depth yet, but disallowing infinite sync iterables would preclude this from working:

ReadableStream.from(Number.range(0, Infinity));

using https://github.com/tc39/proposal-Number.range

We could have separate from() and fromSync() methods? It's a bit more justifiable since the behavior is so different.

domenic avatar Jul 16 '21 00:07 domenic

It would be nice to know where we are losing time. Is it because we have to go through more microtasks? Would it be worth trying to optimize the ReadableStream machinery itself for cases where startAlgorithm and pullAlgorithm run synchronously? That is:

  • In SetUpReadableStreamDefaultController, don't wrap startResult in a promise, but instead set [[started]] to true immediately.
  • In ReadableStreamDefaultControllerCallPullIfNeeded, allow [[pullAlgorithm]] to return a non-promise (or just ignore its result altogether) and set [[pulling]] to false immediately after calling it.

When you exhaustively iterate the array first, ReadableStreamDefaultReaderRead will always synchronously call chunkSteps() (or closeSteps()) since it can read directly from the queue. I think with the above tweaks, we could achieve the same result by synchronously going through pullAlgorithm, which then calls Enqueue() (or Close()) and in turn calls chunkSteps() (or closeSteps()).

Or is it not about the number of microtasks, and are we just running more code within each microtask? 🤔

MattiasBuelens avatar Jul 16 '21 09:07 MattiasBuelens

I added support for backpressure for sync iterators (calling next() inside pullAlgorithm) to see how much difference it would make. I also redid my measurements with a million iterations.

Behaviour Results (μs)
No special cases 127
Special case sync iterables, respecting backpressure 117
Special case sync iterables, ignoring backpressure 103
Special case arrays, ignoring backpressure 92

I think there's a couple of factors at play:

  1. The ES standard sync-to-async conversion is just really complicated and expensive
  2. Doing things asynchronously is slower than doing them synchronously, not just because of task dispatch, but because you need to keep more state on the heap

Although in my new results "Special case sync iterables, ignoring backpressure" is no longer as fast as "Special case arrays, ignoring backpressure", it could definitely be made as fast by exposing the iterations algorithm from the JS engine. Whereas the ES standard sync-to-async behaviour is not going to be that fast with any amount of heroics.

ricea avatar Jul 17 '21 02:07 ricea

Sorry for my radio silence on this. I'm leaning towards keeping the ES-standard sync-to-async behaviour. Even though we know the performance is suboptimal, it's probably acceptable in almost all cases. It has the benefit of being consistent, and simple from a standard text point of view (though sadly not for implementation).

ricea avatar Aug 26 '21 09:08 ricea

I support that decision. If this ever comes up as crucially important in some real-world case, we could have ReadableStream.fromSync() or ReadableStream.fromArray(), perhaps.

domenic avatar Aug 26 '21 14:08 domenic

@evilpie what do you think of the current design? Can we write up Firefox as in support?

domenic avatar Oct 12 '21 21:10 domenic

We haven't fully evaluated this proposal, but we aren't opposed. To me personally this looks good to me without any special casing.

evilpie avatar Oct 13 '21 15:10 evilpie

I plan to implement this for Node.js and (eventually) Cloudflare Workers, so consider both of those runtimes as in support.

jasnell avatar Oct 13 '21 16:10 jasnell

It's been a while, but shall I add the proposed test from my previous comment to the WPT PR? That is: make it a normative requirement that ReadableStream.from(array) iterates lazily, such that array.push() still works?

EDIT: Woops, I forgot that we already agreed upon this earlier. 😅 I'll add the test. 🙂

MattiasBuelens avatar Oct 13 '21 22:10 MattiasBuelens

We haven't fully evaluated this proposal, but we aren't opposed. To me personally this looks good to me without any special casing.

Thanks. Would you like us to wait for a full evaluation before merging it into the spec? Or should we go ahead?

domenic avatar Oct 13 '21 23:10 domenic

@evilpie @annevk ping on https://github.com/whatwg/streams/pull/1083#issuecomment-942780668 :

We haven't fully evaluated this proposal, but we aren't opposed. To me personally this looks good to me without any special casing.

Thanks. Would you like us to wait for a full evaluation before merging it into the spec? Or should we go ahead?

Also always interested to hear from WebKit, e.g. @youennf.

domenic avatar Jun 01 '22 17:06 domenic

Just to weigh in here: I think we (Mozilla) are generally supportive of the proposal; it's a great user feature.

I did a scan of the specification today, and while others have pointed out lots of details, at a high level it seems to nicely accomplish the goals :)

mgaudet avatar Jun 03 '22 13:06 mgaudet