streams
streams copied to clipboard
Piping to writable streams with HWM 0?
Right now, piping to a writable stream with { highWaterMark: 0 } stalls indefinitely:
const rs = new ReadableStream({
start(c) {
c.enqueue("a");
c.enqueue("b");
c.enqueue("c");
c.close();
}
});
const ws = new WritableStream({
write(chunk) {
console.log("wrote:", chunk);
}
}, { highWaterMark: 0 });
rs.pipeTo(ws); // never resolves, and no messages are logged
This makes it impossible to pipe through a TransformStream without increasing the total queue size of a pipe chain by at least one chunk:
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
}, { highWaterMark: 0 }, { highWaterMark: 0 });
rs.pipeThrough(upperCaseTransform).pipeTo(ws); // stalls indefinitely
const upperCaseTransform = new TransformStream({
transform(chunk, controller) {
controller.enqueue(chunk.toUpperCase());
}
}, { highWaterMark: 1 }, { highWaterMark: 0 }); // same as default strategies
rs.pipeThrough(upperCaseTransform); // works, but already pulls the first chunk from `rs`
This is unfortunate, since there are many use cases for synchronous TransformStreams that shouldn't need buffering (i.e. every call to transform() immediately results in at least one enqueue()):
- A generic
mapTransform(fn), similar toarray.map(fn):function mapTransform(fn) { return new TransformStream({ transform(chunk, controller) { controller.enqueue(fn(chunk)); } }); } rs.pipeThrough(mapTransform(x => x.toUpperCase())); TextEncoderStreamandTextDecoderStreamfrom Encoding.
Prior discussions on this topic noted that this is not possible. writer.desiredSize is always <= 0, so writer.ready is always pending:
- #777:
We can't reduce the HWM of the writableStrategy to 0 because it would have permanent backpressure preventing the pipe from working.
- #1083:
Yes. As you observed, a writable stream with a HWM of 0 will always have backpressure. So adding an identity TransformStream to a pipe can't be a complete no-op: it always increases the total queue size by 1.
But that got me thinking. A ReadableStream's source can be pull()ed as a result of reader.read(), even if controller.desiredSize <= 0. Maybe a WritableStream's sink should then also be able to release backpressure even if writer.desiredSize <= 0? 🤔
We could add a method on WritableStreamDefaultController (controller.pull()? controller.releaseBackpressure()? controller.notifyReady()?) that would have the result of immediately resolving the current writer.ready promise. Internally, we would do something like WritableStreamUpdateBackpressure(stream, false). My hope is that we can then use this inside TransformStreamSetBackpressure(), so that pulling from the readable end of a transform stream would also resolve ready on the writable end.
...Or am I missing something very obvious? 😛
Although adding such a mechanism might be useful, I want to make sure we've figured out the root issue here. In particular I worry that maybe we got the semantics of HWM wrong for WritableStream, if there's an expressive gap here.
But, I'm having a hard time articulating the expressive gap. Let's say we had a WritableStream with HWM = 0 that called controller.releaseBackpressure(). (When would it call it, exactly?) What is that WritableStream trying to express?
My best guess was something like "I currently have nothing in the queue, and I intend to consume whatever you give me ASAP, but please don't give me more than 1 chunk". But... that seems like the right semantics for HWM = 1. So I admit I'm confused.
Although adding such a mechanism might be useful, I want to make sure we've figured out the root issue here. In particular I worry that maybe we got the semantics of HWM wrong for WritableStream, if there's an expressive gap here.
But, I'm having a hard time articulating the expressive gap. Let's say we had a WritableStream with HWM = 0 that called
controller.releaseBackpressure(). (When would it call it, exactly?) What is that WritableStream trying to express?
I agree, it's tricky. I'm also still trying to wrap my head around it, it's not very concrete yet. 😛
I'm mainly coming at this from the TransformStream use case. Data written to the writable end will be queued and, when the readable end releases its backpressure, will be transformed and enqueued on the readable end. Similarly, read requests on the readable end should cause the writable end to also request some data if it doesn't have anything queued yet. But does that "write request" mean anything outside of the transform stream's use case?
This may also be necessary if/when we flesh out writable byte streams (#495), and (by extension) transform streams with readable/writable byte stream end(s). If we want "reverse BYOB" or "please use this buffer" semantics, then it would be nice if a BYOB request from the readable end could be forwarded to the writable end. But then the writable end is almost required to have HWM = 0, so it can wait for a read request to provide a buffer (instead of allocating one by itself). 🤔
...Or maybe "reverse BYOB" is too weird, and a "regular" BYOB writer is fine. I don't know yet. 😅
My best guess was something like "I currently have nothing in the queue, and I intend to consume whatever you give me ASAP, but please don't give me more than 1 chunk". But... that seems like the right semantics for HWM = 1. So I admit I'm confused.
With HWM = 1, the stream is asking to always put at least one chunk in its queue, and to try and hold back if it's not done processing that latest chunk yet. Here, most of the time writer.ready will be resolved. You write a bunch of chunks, writer.ready temporarily becomes pending while the sink works through the backlog and eventually writer.ready resolves again.
With HWM = 0, the stream is asking not to put anything its queue, except for the case where it wants a chunk. Here, most of the time writer.ready will be pending. When the sink eventually wants a chunk, writer.ready temporarily becomes resolved and waits for a chunk to be written. Once that happens, writer.ready becomes pending again.
They're similar, but with HWM = 1 the "default" state is for writer.ready to be resolved and the "transient" state is writer.ready being pending, whereas with HWM = 0 those states are reversed.
The goal of avoiding the queue bloat from TransformStream is a good one, but the details of how to do it are tricky.
I think it may be sufficient to have a readyWhenNotWriting boolean as part of the strategy which changes the condition for writer.ready from queueSize < highWaterMark to queueSize < highWaterMark || underlyingSinkWriteNotInProgress.
With HWM = 1, ...
With HWM = 0, ...
Yeah, these make sense. But, could you phrase the transform stream case--which appears to be something in between---in this way? I.e. what is the writable end of such a no-queueing transform stream asking for? How does that different from what the the HWM = 1 case is asking for?
My best guess was something like "I currently have nothing in the queue, and I intend to consume whatever you give me ASAP, but please don't give me more than 1 chunk". But... that seems like the right semantics for HWM = 1. So I admit I'm confused.
"I have no queue, so I couldn't consume a chunk before, but I'm ready to consume one now (because I have a taker for it)".
(I know it technically has a queue, but the intent of HWM = 0 seems to be for it to remain empty ideally, so for all intents and purposes it's acting like it has no queue).
Similarly, read requests on the readable end should cause the writable end to also request some data if it doesn't have anything queued yet.
How about controller.requestData() ? controller.pullProducer() ?
This would be useful for realtime streams where chunks are expensive and prebuffering undesirable (pulling too soon may yield a less fresh realtime frame).
Sorry, still confused...
"I have no queue, so I couldn't consume a chunk before, but I'm ready to consume one now (because I have a taker for it)".
compared to
With HWM = 0, the stream is asking not to put anything its queue, except for the case where it wants a chunk. Here, most of the time writer.ready will be pending. When the sink eventually wants a chunk, writer.ready temporarily becomes resolved and waits for a chunk to be written. Once that happens, writer.ready becomes pending again.
makes it sound like HWM = 0 is perfect for this use case. When the sink wants a chunk, it will signal that through writer.ready.
Yes it is perfect. I was merely trying to articulate the expressive gap (What is that WritableStream trying to express?)
We're on the same page I think.
Well, it feels like there's still a missing capability here, as noted by the OP and even your comment in #1157 indicating that some way to get less buffering would be helpful. I'm just confused as to what that missing capability is still, i.e. I cannot understand what the proposed controller.requestData() would give that isn't already given by using HWM = 0 or HWM = 1.
the proposed controller.requestData()
I was just bikeshedding on controller.releaseBackpressure() based on @MattiasBuelens using the phrase "request some data if it doesn't have anything queued yet". Sorry for causing confusion.
But you're right, there is a (side) issue in https://github.com/whatwg/streams/issues/1157#issuecomment-918547406 with effectively implementing a FrameDropper TransformStream. Today, it requires {highWaterMark: 1}, {highWaterMark: 2} in order to perform the if (controller.desiredSize < 2) to drop a frame. It's not clear how one would implement it with {highWaterMark: 0}, {highWaterMark: 0} which would be desirable.
that isn't already given by using ... HWM = 1
I think the OP does a good job of explaining the problem with HWM =1: it sprinkles chunks into the pipe early, just because HWM = 1 says to do it. I see no inherent reason a TransformStream has to require this to operate, when it could requestData() on demand (when it gets a pull() on its readable). It doesn't require a chunk to have already been queued up, in order to work, inherently.
For sources with time-sensitive chunks — which is any source that may drop chunks as a solution to back-pressure (a camera or WebTransport datagram receiver) — operating on already-queued chunks means potentially operating on old chunks. Example.
So maybe my confusion is this. @MattiasBuelens said
With HWM = 0, the stream is asking not to put anything its queue, except for the case where it wants a chunk. Here, most of the time writer.ready will be pending. When the sink eventually wants a chunk, writer.ready temporarily becomes resolved and waits for a chunk to be written. Once that happens, writer.ready becomes pending again.
I assume this is referring to the current spec? However, I can't figure out in what scenario with HWM = 0 writer.ready will ever resolve.
I assume this is referring to the current spec? However, I can't figure out in what scenario with HWM = 0
writer.readywill ever resolve.
No, this is describing the desired semantics, what I would like HWM = 0 to do. "When the sink eventually wants a chunk" means "when controller.releaseBackpressure() is called".
In the current spec, when HWM = 0, writer.ready is always pending. I would like to change that, so that the underlying sink has some control over that.
Oh! That explains the confusion.
What do you think of @ricea's suggestion in https://github.com/whatwg/streams/issues/1158#issuecomment-897368767 for that purpose? Or even just changing the ready condition from queueSize < highWaterMark to queueSize <= highWaterMark, with some reasoning about how writable streams by default want at least one chunk, and HWM = 0 just says at most one chunk (no matter its size).
I think that's less expressive than a dedicated method, but it might be a better general fix if right now HWM = 0 is just a footgun.
I think it may be sufficient to have a
readyWhenNotWritingboolean as part of the strategy
I don't know if I'd want readyWhenNotWriting as part of the queuing strategy, I feel like it belongs to the underlying sink.
- If it were part of the strategy, we'd have to change the
ByteLengthQueuingStrategyandCountQueuingStrategyclasses too so you could set that property through their constructors. - It'd be kind of weird to have a queuing strategy property that only applies to writable streams.
But if you have good arguments for adding it to the queuing strategy, I'm all ears. 🙂
Or even just changing the ready condition from
queueSize < highWaterMarktoqueueSize <= highWaterMark
That would mean a pipe operation would never fill exactly up to the HWM, you'd always go over the HWM, right? 🤔
I'm still leaning towards a controller method like controller.releaseBackpressure():
- When called, it sets an internal boolean
[[releaseBackpressure]]flag totrueand callsWritableStreamUpdateBackpressure()if needed. - If
[[releaseBackpressure]]is true,WritableStreamDefaultControllerGetBackpressuremust returnfalse(without checkingdesizedSize). - When
WritableStreamDefaultControllerWriteis called,[[releaseBackpressure]]is reset tofalse.
That would mean a pipe operation would never fill exactly up to the HWM, you'd always go over the HWM, right? 🤔
True. I guess another alternative is special-casing 0 HWM, since in that case you don't want to exactly fill up to the HWM (because doing so stalls the stream). But that's kind of icky...
Yeah, that's also what I was thinking. If we consider HWM = 0 to be broken, we could special case it. I think we would only need to change a single abstract op:
WritableStreamDefaultControllerGetBackpressure(controller) performs the following steps:
- Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
- If controller.[[strategyHWM]] is 0,
- Return true if desiredSize < 0, or false otherwise. (This is effectively the same as checking if controller.[[queueTotalSize]] > 0.)
- Otherwise,
- Return true if desiredSize ≤ 0, or false otherwise.
Alternatively, if want zero-size chunks to also cause backpressure, we could change it to "if controller.[[stream]].[[writeRequests]] is not empty" or something.
Still, not sure how I feel about this. Might be difficult to explain this behavior to developers. But then again, so would explaining the need for controller.releaseBackpressure()... 🤷♂️
WritableStreamDefaultControllerGetBackpressure(controller) performs the following steps:
Let desiredSize be ! WritableStreamDefaultControllerGetDesiredSize(controller).
If controller.[[strategyHWM]] is 0,
- Return true if desiredSize < 0, or false otherwise. (This is effectively the same as checking if controller.[[queueTotalSize]] > 0.)
Otherwise,
- Return true if desiredSize ≤ 0, or false otherwise.
Is this different from just making HWM 0 equivalent to HWM 1?
Is this different from just making HWM 0 equivalent to HWM 1?
You're right, that change would make them equivalent. That's not what we want. Woops! 😅
Okay, then it's back to the explicit controller.releaseBackpressure(). 😛
Is this different from just making HWM 0 equivalent to HWM 1?
I believe it's different in the case where the queue total size is between 0 and 1. But the fundamental issue where it causes a chunk to end up in the queue remains; see below.
Okay, then it's back to the explicit controller.releaseBackpressure().
I was hoping there'd be some way to get the benefit here, without a new API, and with some way of "fixing" HWM = 0 so that it's less of a footgun and more useful.
But going back to the description of the desired behavior,
With HWM = 0, the stream is asking not to put anything its queue, except for the case where it wants a chunk. Here, most of the time writer.ready will be pending. When the sink eventually wants a chunk, writer.ready temporarily becomes resolved and waits for a chunk to be written. Once that happens, writer.ready becomes pending again.
I guess we might need something different after all. If we try to use the HWM, we're intricately tied to the queue; the only way to switch between "not ready" and "wants a chunk" mode is by manipulating the contents of the queue, presumably by dropping it from containing a chunk to not containing a chunk. But that means the queue must contain a chunk during the "not ready" state, which is what we want to avoid!
So I am convinced that we cannot do anything just by tweaking the HWM/queue comparison mechanism.
That leaves two options:
- Explicit method; or
readyWhenNotWriting
How do these compare in expressiveness? At first I thought the explicit method was more expressive, since you could combine it with any arbitrary HWM. But I wonder if that's actually true... does it make sense to use it with non-0 HWMs? How do each compare for realistic use cases?
It almost feels like readyWhenNotWriting is just a totally alternate kind of queuing strategy, separate from a size-based one?
does it make sense to use it with non-0 HWMs?
I don't think so, because releasing back pressure (which is built on queues) works fine then (there's water on the wheel).
This doesn't feel like a different strategy to me, so much as a way for WritableStream to signal for data (sorta) like one can with ReadableStream.
I think I see two options for the API:
- Implicit. Semantics are "
writer.readyis true when the underlying sink is not writing". - Explicit. Semantics are "
writer.readyis true whencontroller.bikeshed()has been called until the next chunk is written".
I am leaning towards the explicit options, because the implicit version may require sink authors to artificially extend the time before resolving the write() promise just to stop another chunk from being written.
The implicit version would only do anything when HWM=0. The explicit version could in principle work with any HWM, but there is no obvious use case for it (if we are already behind in writing, why would we ask for more chunks?).