streams icon indicating copy to clipboard operation
streams copied to clipboard

pipe redirection

Open tyoshino opened this issue 9 years ago • 3 comments

I started feeling that we've been too optimistic about use of specialized pipeTo() algorithm given an identify transform between the pair of rs/ws.

In the discussion about the Request object, I've been saying that

  • we should be able to .write() even before the Request gets attached to the final sink (https://github.com/yutakahirano/fetch-with-streams/issues/30#issuecomment-90882279).
  • we can run specialized .pipeTo() internally even with existence of such a buffer.

The following are reduced example of what I said:

Example: id is writable even before id.readable.pipeTo() happens.

id.writable.write("hello");
id.writable.write("world");
id.readable.pipeTo(ws);

Example: In the following two examples, rs.pipeTo(ws) should happen eventually and internally after draining chunks queued in id.

rs.pipeTo(id.writable);
id.readable.pipeTo(ws);  // Eventually rs.pipeTo(ws)
rs.pipeTo(id.writable);
id.readable.pipeTo(ws);  // Eventually rs.pipeTo(ws)

Note that not whole of the rs.pipeTo(ws) algorithm should always happen. Closure of rs should result in making only id closed if id.readable.pipeTo(ws) is changed to id.readable.pipeTo(ws, { preventClose: true }).

I'd call the optimized main data transfer part of .pipeTo() as .specialDataTransfer() which excludes close/error signal handling.

So, let's think of what we should do for the following example:

id.writable.write("hello");
id.writable.write("world");
rs.pipeTo(id.writable);
id.readable.pipeTo(ws);

Here are two issues:

  • id already has two chunks queued in it. If id.readable.pipeTo(ws) algorithm is the one of ReadableStream.pipeTo() as-is, we cannot drain all the data from id until ws stop exerting back-pressure.
  • Without an ability to stop rs.pipeTo(id.writable), we never be able to switch to rs.specialDataTransfer(ws).

I guess, the solution is:

  • Identity transform flushes all the data queued in it when id.readable.pipeTo(ws) is called regardless of back-pressure of ws.
  • Provide an (possibly private) API, say .redirectTo, on the WritableStream for .pipeTo() algorithm to redirect .specialDataTransfer() to ws. ReadableStream.pipeTo() is required to check .redirectTo before every dest.write() and if it's set, it must switch to use rs.specialDataTransfer(dest.redirectTo).

Maybe this could be evolved to address #307, too.

tyoshino avatar Apr 09 '15 06:04 tyoshino

To be clear, I'm not saying the above is elegant. Exploring if there's any simple solution.

tyoshino avatar Apr 09 '15 16:04 tyoshino

This is really good analysis but I'm not 100% sure I understand it all, so please be patient with me :). First, some quick comments on points that caught my eye:

Without an ability to stop rs.pipeTo(id.writable), we never be able to switch to rs.specialDataTransfer(ws).

We've wanted a way to cancel a pipe since at least https://github.com/whatwg/streams/commit/726b08d4696758b28c566f84f2eec7b10d5e1bdf. Our solution is cancelable promises, but even before they land we could have a spec mechanism like CancelCurrentPipe(rs) that will eventually be exposed through pipePromise.cancel() but until then will be usable by UA-created streams or other internal algorithms. So maybe this helps.

Provide an (possibly private) API, say .redirectTo, on the WritableStream for .pipeTo() algorithm to redirect .specialDataTransfer() to ws. ReadableStream.pipeTo() is required to check .redirectTo before every dest.write() and if it's set, it must switch to use rs.specialDataTransfer(dest.redirectTo).

This is kind of interesting and I think could eventually be massaged into something more elegant. It reminds me a bit of #146. But I agree it is complex and a simpler version would be nice.


Second a question to see if I understand the problematic scenario:

id.writable.write("hello");
id.writable.write("world");
rs.pipeTo(id.writable);
id.readable.pipeTo(ws);

Our goal here in an ideal world for the outcome would be:

  • "hello" makes its way to ws (not respecting backpressure)
  • "world" makes its way to ws (not respecting backpressure)
  • Once ws stops giving off backpressure signals (including any triggered by "hello" or "world"), we run rs.specialDataTransfer(ws).

Is that right? Did I miss anything?

If so, this doesn't seem too hard to solve. I'm not sure how we would specify the exact protocol (including the issues of #307, of how to identify that a stream is identity and can be skipped---maybe that is your redirectTo idea!? I think it is now that I re-read.) But I think an algorithm of "wait for ultimate-non-identity-ready, then do special data transfer" is doable.

I guess the heart of this concern is coming up with a protocol such that, in the special case where once there is nothing buffered in the identity transform and both the writable side has something piped to it and the readable side is piped somewhere, we can do rs.pipeTo(ws) (assuming rs.pipeTo has special-data-transfer logic).

Given that with #321 pipeTo's internals are effectively unobservable, we should have a lot of latitude in designing this protocol, I hope. An alternate approach is to say that identity transforms are special and don't use the normal readable and writable but instead use something more like operation stream while exposing the same public interface as readable and writable... That's a bit tricky though.

It would be good to prototype this kind of thing as it has the flavor of something that needs actual testing to see if the edge cases work.

domenic avatar Apr 10 '15 00:04 domenic

I just want to say that the more I think about this in combination with #307, the more I think redirectTo is actually kind of nice. Before each write, check ws.redirectTo (or its corresponding internal slot to stay unobservable). If it's set, switch to this.pipeTo(ws.redirectTo, options). Kind of feels hacky, but also kind of feels extremely practical.

domenic avatar Apr 10 '15 00:04 domenic