streams
streams copied to clipboard
lowWaterMark or fixed-size chunks for TransformStream?
Hi,
Reading the spec, I understand that applying a ByteLengthQueuingStrategy to a ReadableStream will buffer the given amount of bytes and pass this through the pipe when reached the highWaterMark. Am I correct?
I tried with something like that:
const readable = new ReadableStream({
type: 'bytes',
start(controller) {
return self.fetch(…).then(response => {
const reader = response.body.getReader();
return reader.read().then(function process(result) {
if (result.done) { return; }
controller.enqueue(result.value);
return reader.read().then(process);
});
});
}
}, new ByteLengthQueuingStrategy({highWaterMark: 1024}));
readable
.pipeThrough(new TransformStream({
transform(buffer, done, enqueue, close, err) {
console.log('Buffer size:', buffer.length); // Should always be 1024
enqueue(buffer);
return done();
}
}));
But it seams the TransformStream is called whenever there is data enqueued to the ReadableStreamController.
P.S. : Not really sure of this issue's title. P.P.S. : Thanks for the good work 👍
highWaterMark is actually the (target) maximum amount that a ReadableStream wants to buffer, not a minimum. If nothing reads from the ReadableStream, and if the underlyingSource respects backpressure (by refusing to enqueue new chunks when controller.desiredSize <= 0), the internal queue will stay at or below the HWM.
If you're reading from a ReadableStream, with reader.then() or with transform() in a TransformStream, it always receives the chunks one at a time, exactly as they were enqueued. (Unrelated to your question, but the TransformStream API has changed a lot very recently, it doesn't have done(), etc anymore.)
Thanks, I understand it now (sorry, streams are quite new for me).
So, I was looking for a lowWaterMark, would it worth considering?
For example, my data need to go through a "decryption transform" and it requires a minimum amount of bytes. I managed to do it using a transform:
var retainer = new TransformStream({
retained: new Uint8Array(0), // Store this in the underlying source, not sure if it's a good thing
transform(buffer, done, enqueue, close, err) {
const view = new Uint8Array(buffer.buffer, this.retained.byteOffset, this.retained.byteLength + buffer.byteLength);
const size = Math.floor(view.byteLength / 1024) * 1024; // Pipe only a multiple of 1024 bytes
this.retained = new Uint8Array(view.buffer, view.byteOffset + size, view.byteLength - size);
if (size) {
enqueue(new Uint8Array(view.buffer, view.byteOffset, size));
}
done();
}
});
(I do not have the code with me, it might be wrong) (Sorry, still using the old TransformStream API)