streams icon indicating copy to clipboard operation
streams copied to clipboard

Add a realtime mode to ReadableStream.tee

Open youennf opened this issue 4 years ago • 5 comments
trafficstars

For realtime applications dealing with ReadableStreams of VideoFrames, it is very nice to be able to limit buffering between teed branches. This also ensures that both branches will receive the most recent video frame. As discussed during today's WebRTC/WhatWG stream call, we could envision something like a tee({structuredClone: true, noBuffer: true}).

youennf avatar Nov 16 '21 14:11 youennf

In https://github.com/whatwg/streams/issues/1157#issuecomment-893728969, we discussed and prototyped a "synchronized" variant of tee(). That variant would only pull from the original stream if both branches are pulling, to make sure that both branches see the same chunks at the same time (by slowing down the faster reader).

If I understand correctly, this "realtime" variant is slightly different. It would pull from the original stream if at least one of the two branches is pulling, and only enqueue the received chunk to branches that are actively pulling. That way, we avoid overfilling the queue of the slower branch by "skipping" chunks.

What should happen if one branch is only slightly slower than the other? That is:

  1. Branch 1 starts pulling. tee() starts pulling from the original stream.
  2. We receive a chunk from the original stream. Only branch 1 is pulling, so we only enqueue it to that branch.
  3. Branch 2 starts pulling, but only slightly after we did the previous step.

I see two options:

  • Treat this as a new pull. Thus, tee() starts pulling the next chunk from the original stream, which will definitely go to branch 2. (If branch 1 manages to catch up in the meantime and also starts pulling, we'll also enqueue it to branch 1.)
  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime. However, if it doesn't get overwritten, then this previous chunk may be very old by the time branch 2 pulls it.

Or perhaps some combination of these two, with some sort of "max age" to decide between enqueuing the previous chunk or pulling a new chunk? 🤷

MattiasBuelens avatar Nov 16 '21 15:11 MattiasBuelens

  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime.

My understanding was to go with this approach.

youennf avatar Nov 18 '21 08:11 youennf

  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime.

My understanding was to go with this approach.

This would imply that if structuredClone is true, we always need to clone the chunk. Which is a problem if explicit closure is needed.

ricea avatar Nov 18 '21 09:11 ricea

  • Treat this as a new pull ... (If branch 1 manages to catch up in the meantime and also starts pulling, we'll also enqueue it to branch 1.)

Is "in the meantime" = anytime before a chunk newer than "new pull" is delivered to branch 2? (The pull promise is unreliable here imho)

If the underlying source's internal queue is not empty, this would seem to devolve into delivering different chunks to each branch, pulling at a rate faster than either of the branches would have individually. I don't think we ever want to steal chunks from the fast branch, even if upstream has backed up.

  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime.

This would appear to keep the pull rate = max(pull rate A, pull rate B), and expose mostly the same (cloned) frames, which seems desirable or at least easier to reason about.

This would imply that if structuredClone is true, we always need to clone the chunk. Which is a problem if explicit closure is needed.

Yes, though we need to solve that somehow anyway for when a stream is errored.

jan-ivar avatar Nov 22 '21 22:11 jan-ivar

  • Treat this as a new pull ... (If branch 1 manages to catch up in the meantime and also starts pulling, we'll also enqueue it to branch 1.)

Is "in the meantime" = anytime before a chunk newer than "new pull" is delivered to branch 2? (The pull promise is unreliable here imho)

Yes, that's what I meant.

If the underlying source's internal queue is not empty, this would seem to devolve into delivering different chunks to each branch, pulling at a rate faster than either of the branches would have individually. I don't think we ever want to steal chunks from the fast branch, even if upstream has backed up.

Agreed. We also definitely don't want to pull faster than the fastest branch, because that could get very bad if you do multiple tee()s in a "chain".

I probably should have thought a bit more about this option first, because now it sounds like an obviously bad idea. 😛

  • Store the previous chunk, and enqueue it when branch 2 (eventually) starts pulling. This chunk can be overwritten when branch 1 pulls in more chunks in the meantime.

This would appear to keep the pull rate = max(pull rate A, pull rate B), and expose mostly the same (cloned) frames, which seems desirable or at least easier to reason about.

Yes, this seems much better. 👍

This would imply that if structuredClone is true, we always need to clone the chunk. Which is a problem if explicit closure is needed.

Yes, though we need to solve that somehow anyway for when a stream is errored.

Yes, whatever solution we end up choosing for #1185 will also need to be integrated in tee(). We need to close the cloned chunk when it gets replaced by a newer one, or when the branch that would have received the clone gets cancelled.

We do have to be careful with how we design explicit closure. If we go with [Transferable, Closeable] as in https://github.com/whatwg/streams/issues/1187#issuecomment-972733678, it should be fine: we can use the same "close a chunk" algorithm inside ReadableStreamTee. If we go with something like UnderlyingSink.closeChunk(chunk), then we have to copy that algorithm over to the two branches. (Otherwise, we might run into problems if the original stream clears its algorithms before both branches are done.)

MattiasBuelens avatar Nov 22 '21 23:11 MattiasBuelens