streams icon indicating copy to clipboard operation
streams copied to clipboard

lowWaterMark or fixed-size chunks for TransformStream?

Open neovov opened this issue 9 years ago • 2 comments
trafficstars

Hi,

Reading the spec, I understand that applying a ByteLengthQueuingStrategy to a ReadableStream will buffer the given amount of bytes and pass this through the pipe when reached the highWaterMark. Am I correct?

I tried with something like that:

const readable = new ReadableStream({
  type: 'bytes',
  start(controller) {
    return self.fetch(…).then(response => {
      const reader = response.body.getReader();
      return reader.read().then(function process(result) {
        if (result.done) { return; }
        controller.enqueue(result.value);
        return reader.read().then(process);
      });
    });
  }
}, new ByteLengthQueuingStrategy({highWaterMark: 1024}));

readable
  .pipeThrough(new TransformStream({
    transform(buffer, done, enqueue, close, err) {
      console.log('Buffer size:', buffer.length); // Should always be 1024
      enqueue(buffer);
      return done();
    }
  }));

But it seams the TransformStream is called whenever there is data enqueued to the ReadableStreamController.

P.S. : Not really sure of this issue's title. P.P.S. : Thanks for the good work 👍

neovov avatar Oct 21 '16 11:10 neovov

highWaterMark is actually the (target) maximum amount that a ReadableStream wants to buffer, not a minimum. If nothing reads from the ReadableStream, and if the underlyingSource respects backpressure (by refusing to enqueue new chunks when controller.desiredSize <= 0), the internal queue will stay at or below the HWM.

If you're reading from a ReadableStream, with reader.then() or with transform() in a TransformStream, it always receives the chunks one at a time, exactly as they were enqueued. (Unrelated to your question, but the TransformStream API has changed a lot very recently, it doesn't have done(), etc anymore.)

isonmad avatar Oct 23 '16 21:10 isonmad

Thanks, I understand it now (sorry, streams are quite new for me).

So, I was looking for a lowWaterMark, would it worth considering? For example, my data need to go through a "decryption transform" and it requires a minimum amount of bytes. I managed to do it using a transform:

var retainer = new TransformStream({
  retained: new Uint8Array(0), // Store this in the underlying source, not sure if it's a good thing
  transform(buffer, done, enqueue, close, err) {
    const view = new Uint8Array(buffer.buffer, this.retained.byteOffset, this.retained.byteLength + buffer.byteLength);
    const size = Math.floor(view.byteLength / 1024) * 1024; // Pipe only a multiple of 1024 bytes
    this.retained = new Uint8Array(view.buffer, view.byteOffset + size, view.byteLength - size);
    if (size) {
      enqueue(new Uint8Array(view.buffer, view.byteOffset, size));
    }
    done();
  }
});

(I do not have the code with me, it might be wrong) (Sorry, still using the old TransformStream API)

neovov avatar Oct 24 '16 07:10 neovov