streams
streams copied to clipboard
pipe redirection
I started feeling that we've been too optimistic about use of specialized pipeTo()
algorithm given an identify transform between the pair of rs/ws.
In the discussion about the Request
object, I've been saying that
- we should be able to
.write()
even before theRequest
gets attached to the final sink (https://github.com/yutakahirano/fetch-with-streams/issues/30#issuecomment-90882279). - we can run specialized
.pipeTo()
internally even with existence of such a buffer.
The following are reduced example of what I said:
Example: id
is writable even before id.readable.pipeTo()
happens.
id.writable.write("hello");
id.writable.write("world");
id.readable.pipeTo(ws);
Example: In the following two examples, rs.pipeTo(ws)
should happen eventually and internally after draining chunks queued in id
.
rs.pipeTo(id.writable);
id.readable.pipeTo(ws); // Eventually rs.pipeTo(ws)
rs.pipeTo(id.writable);
id.readable.pipeTo(ws); // Eventually rs.pipeTo(ws)
Note that not whole of the rs.pipeTo(ws)
algorithm should always happen. Closure of rs
should result in making only id
closed if id.readable.pipeTo(ws)
is changed to id.readable.pipeTo(ws, { preventClose: true })
.
I'd call the optimized main data transfer part of .pipeTo()
as .specialDataTransfer()
which excludes close/error signal handling.
So, let's think of what we should do for the following example:
id.writable.write("hello");
id.writable.write("world");
rs.pipeTo(id.writable);
id.readable.pipeTo(ws);
Here are two issues:
-
id
already has two chunks queued in it. Ifid.readable.pipeTo(ws)
algorithm is the one ofReadableStream.pipeTo()
as-is, we cannot drain all the data fromid
untilws
stop exerting back-pressure. - Without an ability to stop
rs.pipeTo(id.writable)
, we never be able to switch tors.specialDataTransfer(ws)
.
I guess, the solution is:
- Identity transform flushes all the data queued in it when
id.readable.pipeTo(ws)
is called regardless of back-pressure ofws
. - Provide an (possibly private) API, say
.redirectTo
, on theWritableStream
for.pipeTo()
algorithm to redirect.specialDataTransfer()
tows
.ReadableStream.pipeTo()
is required to check.redirectTo
before everydest.write()
and if it's set, it must switch to users.specialDataTransfer(dest.redirectTo)
.
Maybe this could be evolved to address #307, too.
To be clear, I'm not saying the above is elegant. Exploring if there's any simple solution.
This is really good analysis but I'm not 100% sure I understand it all, so please be patient with me :). First, some quick comments on points that caught my eye:
Without an ability to stop rs.pipeTo(id.writable), we never be able to switch to rs.specialDataTransfer(ws).
We've wanted a way to cancel a pipe since at least https://github.com/whatwg/streams/commit/726b08d4696758b28c566f84f2eec7b10d5e1bdf. Our solution is cancelable promises, but even before they land we could have a spec mechanism like CancelCurrentPipe(rs) that will eventually be exposed through pipePromise.cancel()
but until then will be usable by UA-created streams or other internal algorithms. So maybe this helps.
Provide an (possibly private) API, say .redirectTo, on the WritableStream for .pipeTo() algorithm to redirect .specialDataTransfer() to ws. ReadableStream.pipeTo() is required to check .redirectTo before every dest.write() and if it's set, it must switch to use rs.specialDataTransfer(dest.redirectTo).
This is kind of interesting and I think could eventually be massaged into something more elegant. It reminds me a bit of #146. But I agree it is complex and a simpler version would be nice.
Second a question to see if I understand the problematic scenario:
id.writable.write("hello");
id.writable.write("world");
rs.pipeTo(id.writable);
id.readable.pipeTo(ws);
Our goal here in an ideal world for the outcome would be:
- "hello" makes its way to
ws
(not respecting backpressure) - "world" makes its way to
ws
(not respecting backpressure) - Once
ws
stops giving off backpressure signals (including any triggered by "hello" or "world"), we runrs.specialDataTransfer(ws)
.
Is that right? Did I miss anything?
If so, this doesn't seem too hard to solve. I'm not sure how we would specify the exact protocol (including the issues of #307, of how to identify that a stream is identity and can be skipped---maybe that is your redirectTo idea!? I think it is now that I re-read.) But I think an algorithm of "wait for ultimate-non-identity-ready, then do special data transfer" is doable.
I guess the heart of this concern is coming up with a protocol such that, in the special case where once there is nothing buffered in the identity transform and both the writable
side has something piped to it and the readable
side is piped somewhere, we can do rs.pipeTo(ws)
(assuming rs.pipeTo
has special-data-transfer logic).
Given that with #321 pipeTo
's internals are effectively unobservable, we should have a lot of latitude in designing this protocol, I hope. An alternate approach is to say that identity transforms are special and don't use the normal readable
and writable
but instead use something more like operation stream while exposing the same public interface as readable and writable... That's a bit tricky though.
It would be good to prototype this kind of thing as it has the flavor of something that needs actual testing to see if the edge cases work.
I just want to say that the more I think about this in combination with #307, the more I think redirectTo is actually kind of nice. Before each write, check ws.redirectTo
(or its corresponding internal slot to stay unobservable). If it's set, switch to this.pipeTo(ws.redirectTo, options)
. Kind of feels hacky, but also kind of feels extremely practical.