streams
streams copied to clipboard
Add a proof of concept of optimized pipe
Related to #359, #325, #97, #321, #461
This is a proof-of-concept code snippet for my answer to the optimized piping vision. Basically brain dump + a little validation of solidness.
Very, very interesting. Could you explain the vision a bit more? I think I see the outlines, but getting a high-level view would be nice.
I also think it would be ideal if we allowed JS strategies somehow. I guess they make things fairly observable though, as size() gets called all the time... that's tricky.
I'm going to work today on a requirements-based spec for pipeTo. Let's make sure it synchronizes with your vision.
OK.
I forgot to note but this is intended to be used just for demonstrating that our API enables this kind of optimization. Not necessarily intended to be a part of the specification.
PipeRequest
First, an instance of the PipeRequest class corresponds to each pipeTo() invocation. It holds the instances and parameters given to the method:
- destination writable stream instance
- source readable stream instance
- parameters such as preventClose given to pipeTo()
It also has
- the
pipe
slot which points the Pipe instance which represents ongoing piping which is going to satisfy the PipeRequest (wholly or partially) if any. - the
done
flag which is initially false.
Every time a new pipeTo() is invoked, a PipeRequest instance describing the pipeTo() invocation is created and:
- the PipeRequest is registered with the global pipe manager.
- the PipeRequest retrieves current (skipped) piping candidates from the destination writable stream from
.pipeCandidates
and passes it to the source readable stream by calling.notifyPipeCandidates()
. This is called pipe candidates propagation. - the PipeRequest subscribes to the destination writable stream to get notified of updates on the piping candidates of the destination writable stream by calling
.waitPipeCandidatesChange()
. Every time new candidates are received, it's propagated to the source readable stream.
The global pipe manager holds:
- a list of all readable stream instances being piped.
- lists representing piping candidates for each of the piped readable streams.
- It's a list of references to PipeRequest instances. It's sorted in the order where ones went through the least number of "pipe candidates propagation" comes first. I.e. destinations closest to the readable stream comes first.
The global pipe manager chooses the best subset of triplets of:
- a readable stream
- pipe candidate (i.e. the destination writable stream)
- the transfer method to use. E.g. between a TCP socket and a file descriptor, either of the default pipeTo() algorithm operating over the public ReadableStream._Reader/WritableStream._Writer interfaces, or offloading by sendfile() could be used. The global pipe manager lists up available transfer methods by checking the underlying sources/sinks of the readable/writable stream pair.
The subset must cover all the active PipeRequests. E.g. skipped piping to b
in a list representing skipped piping candidates [a, b, c]
can cover PipeRequests a
and b
. c
must be covered by any other pipe.
Reorganizing pipes
Ongoing pipes can be stopped to be reorganized into longer or faster pipe. The duration (number of bytes to transfer) of a long piping is limited to the minimum of the requested number of bytes to transfer of all PipeRequests covered by the long piping.
Candidate propagation
The propagation of pipe candidates may happen asynchronously. E.g. IdentityTransformStream does that. This is useful when an IdentityTransformStream already have some chunks enqueued in it. It may exert backpressure to the source readable stream of an existing pipe whose destination is the writable side of the IdentityTransformStream so that it temporarily stops write()-ing new chunks. Once all the chunks are drained, it can announce the piping candidates received at the readable side to the writable side to choose the faster one. I think this backpressure is not normal backpressure signal but should be made by announcing an empty pipeCandidates to completely stop the piping (readable to the writable of the IdentityTransformStream) and wait for the ongoing piping to stop. We need some mechanism to realize this. Without strict backpressure like this, it's possible the ongoing pipe write()s new chunks before seeing the updates pipe candidates including long piping and switch to it before the queued chunks are processed.
Extended the API a bit to address the flushing issue I discussed in the last comment.
Wow, perfect, thank you for the explanation! I understand this is not necessarily something for the spec, but I think it will be valuable to have written up somewhere that we can point people to as an example of how to use the spec's flexibility around piping to accomplish great things. It might be worth prototyping as well, if we have the time and energy; I'm not sure if that was your plan in this PR or if you were just using the .js code as a way to organize the thoughts.
So let me try to restate the actors in the system:
- pipe requests represent a pipeTo call, i.e. (readable, writable, options) tuples
-
pipes represent actual data transfers between readable and writable streams
- they include information on the transfer method, so they are roughly (readable, writable, transfer method) tuples.
- the pipe manager is responsible for finding a series of pipes that satisfy all ongoing pipe requests
- it probably also is responsible for ensuring that no matter what pipes are in play, the options for each pipe request are honored
With this in mind, the notifyPipeCandidates/waitPipeCandidatesChange mechanisms are all about coordinating to potentially update the set of pipes in action when the set of pipe requests in action change. This is what the reorganizing pipes section is about, I think?
I'm not sure I fully understand the candidate propagation section, or why it implies a new backpressure mechanism. My vision of identity transform streams (which is not fully thought out, so might be wrong) is that in a system like this, they get completely excluded from the list of pipes, i.e. rs.pipeThrough(id).pipeTo(ws)
will create two pipe requests but only one pipe (from rs
to ws
). Then normal backpressure propagation would take place between rs
and ws
.
I guess you are concerned about cases like rs.pipeThrough(id); setTimeout(() => id.readable.pipeTo(ws), 10000)
, when you talk about asynchronocity? Or is it something else?
I'm not sure if that was your plan in this PR or if you were just using the .js code as a way to organize the thoughts.
Yeah, initially I attempted to just implement the API into the ReadableStream, but it looked it's going to complicate the reference impl so much, so I wrote it in a separate file. I want to evolve the code snippet to something working for verifying our ideas. Yes.
This is what the reorganizing pipes section is about, I think?
Right
I guess you are concerned about cases like rs.pipeThrough(id); setTimeout(() => id.readable.pipeTo(ws), 10000), when you talk about asynchronocity? Or is it something else?
Ah, yeah. Asynchronous invocation of pipeTo()s and pending chunks.
IdentityTransformStream's readable side and writable side themselves work as ReadableStream and WritableStream, and they can be separately passed around. Even pipeThrough() may be given some writable / readable pair which are build without considering the transform streams concept at all. I'd like to confirm that this understanding of mine is correct, first. Or, do you have some idea in your mind that we brand check that a given pair of writable / readable to pipeThrough is "transform stream"?
So, the writable side of an identity transform may get write()-en manually even when it's exerting backpressure. Then, some chunks are queued inside the transform stream.
Once pipeTo() is invoked on both the readable side and the writable side of the transform stream with pending chunks, we can consider optimizing this pair of pipeTo()-s. But we need to have the transform stream flush the chunks before establishing such skipped transfer.
Hmm, we can also resolve this by having a requirement enforced on all pipeTo() implementations to stop writing when the normal backpressure signal is exerted as well as the current pipeTo() reference implementation is doing. Yeah, it may work.
IdentityTransformStream's readable side and writable side themselves work as ReadableStream and WritableStream, and they can be separately passed around. Even pipeThrough() may be given some writable / readable pair which are build without considering the transform streams concept at all. I'd like to confirm that this understanding of mine is correct, first. Or, do you have some idea in your mind that we brand check that a given pair of writable / readable to pipeThrough is "transform stream"?
I haven't thought this through fully. But my idea was that yes, we would add an unobservable brand check and fast path if we know it's a transform stream created with the TransformStream constructor (or with TransformStream.identity()
or whatever).
Once pipeTo() is invoked on both the readable side and the writable side of the transform stream with pending chunks, we can consider optimizing this pair of pipeTo()-s. But we need to have the transform stream flush the chunks before establishing such skipped transfer.
Got it. I understand the problem now.
Hmm, we can also resolve this by having a requirement enforced on all pipeTo() implementations to stop writing when the normal backpressure signal is exerted as well as the current pipeTo() reference implementation is doing. Yeah, it may work.
The draft in https://github.com/whatwg/streams/pull/512 says "While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null, the user agent must not read from reader." I think this is what we want, although I guess it is talking about reading, not writing. Maybe it should say both.
IdentityTransformStream's readable side and writable side themselves work as ReadableStream and WritableStream, and they can be separately passed around. Even pipeThrough() may be given some writable / readable pair which are build without considering the transform streams concept at all. I'd like to confirm that this understanding of mine is correct, first. Or, do you have some idea in your mind that we brand check that a given pair of writable / readable to pipeThrough is "transform stream"?
I haven't thought this through fully. But my idea was that yes, we would add an unobservable brand check and fast path if we know it's a transform stream created with the TransformStream constructor (or with TransformStream.identity()
or whatever).
Once pipeTo() is invoked on both the readable side and the writable side of the transform stream with pending chunks, we can consider optimizing this pair of pipeTo()-s. But we need to have the transform stream flush the chunks before establishing such skipped transfer.
Got it. I understand the problem now.
Hmm, we can also resolve this by having a requirement enforced on all pipeTo() implementations to stop writing when the normal backpressure signal is exerted as well as the current pipeTo() reference implementation is doing. Yeah, it may work.
The draft in https://github.com/whatwg/streams/pull/512 says "While WritableStreamDefaultWriterGetDesiredSize(writer) is ≤ 0 or is null, the user agent must not read from reader." I think this is what we want, although I guess it is talking about reading, not writing. Maybe it should say both.
we would add an unobservable brand check and fast path
I see. Anyway we need to perform some special interaction with the streams to realize the optimization. Even the propagation idea is requiring the streams to expose special signal / method.
One good thing of leaving the decision to the streams and have the global manager to fetch some predefined signal from them as done in the propagation idea above is that each transform stream can control the timing to announce longer piping. E.g. one could consume some initial header to do something special, and then pass through data to the next streams.
Maybe it should say both.
Yeah! I'm not yet sure about necessity of different backpressure signals. Let's just try.