node icon indicating copy to clipboard operation
node copied to clipboard

stream: adjust src hwm when pipelining

Open ronag opened this issue 3 years ago • 26 comments

ronag avatar Nov 07 '21 09:11 ronag

@nodejs/startup @addaleax

ronag avatar Nov 07 '21 09:11 ronag

CI: https://ci.nodejs.org/job/node-test-pull-request/40820/

nodejs-github-bot avatar Nov 10 '21 15:11 nodejs-github-bot

@nodejs/streams

ronag avatar Nov 10 '21 15:11 ronag

What happens if the source is already piped to another destination via source.pipe(destination)? Does this allow the same source to be piped to multiple destinations?

lpinca avatar Nov 10 '21 20:11 lpinca

Combining pipe and read is not a good idea... so no

ronag avatar Nov 10 '21 20:11 ronag

What happens if the source is already piped to another destination via source.pipe(destination)? Does this allow the same source to be piped to multiple destinations?

It will work but the consumption will be driven by pipeline as the 'readable' takes precedence.

@ronag could you add a test for this case?

mcollina avatar Nov 11 '21 09:11 mcollina

It will work but the consumption will be driven by pipeline as the 'readable' takes precedence.

If my understanding is correct, then consumption will be driven by the fastest destination, not the slowest, potentially messing up backpressure handling, right?

lpinca avatar Nov 11 '21 11:11 lpinca

If my understanding is correct, then consumption will be driven by the fastest destination, not the slowest, potentially messing up backpressure handling, right?

No, not really. The same happens if you mix async iterators and .pipe(). AsyncIterators use .read() and they drive the consumption of the source data.

Unfortuunately this is the only way .read() can work reliably. See https://github.com/nodejs/node/pull/18994 and https://github.com/nodejs/node/issues/18058.

mcollina avatar Nov 11 '21 11:11 mcollina

What happens if the source is already piped to another destination via source.pipe(destination)? Does this allow the same source to be piped to multiple destinations?

It will work but the consumption will be driven by pipeline as the 'readable' takes precedence.

@ronag could you add a test for this case?

I don't really see what a test here contributes? It uses an existing api.

ronag avatar Nov 11 '21 13:11 ronag

'use strict';

const stream = require('stream');

const chunk = Buffer.alloc(1024);

function noop() {}

const source = new stream.Readable({
  read() {
    this.push(chunk);
  }
});

let bytesWritten = 0;

const destination = new stream.Writable({
  write(chunk, encoding, callback) {
    bytesWritten += chunk.length;
    callback();
  }
});

stream.pipeline(source, destination, noop);

setInterval(function () {
  console.log(bytesWritten);
}, 1000);

This crashes on my machine due to OOM.

lpinca avatar Nov 11 '21 14:11 lpinca

And this is the point I was trying to make in https://github.com/nodejs/node/pull/40751#issuecomment-966241814

'use strict';

const stream = require('stream');

const chunk = Buffer.alloc(1024);

function noop() {}

const source = new stream.Readable({
  read() {
    this.push(chunk);
  }
});

const slowDestination = new stream.Writable({
  write(chunk, encoding, callback) {
    setTimeout(callback, 1000);
  }
});

const fastDestination = new stream.Writable({
  write(chunk, encoding, callback) {
    setImmediate(callback);
  }
});

stream.pipeline(source, fastDestination, noop);
source.pipe(slowDestination);

setInterval(function () {
  console.log(slowDestination.writableLength);
}, 1000);

but I understand that mixing readable.pipe() and stream.pipeline() is not a good idea and it is an issue even now if async iterators are used.

lpinca avatar Nov 11 '21 14:11 lpinca

yea... it's not optimal. The problem is that we have 4 API's that are not strictly compatible:

  • pause/resume/'data'/'end'
  • pipe
  • read/'readable'
  • async gen

The question here is which API should pipeline use? I've previously recommended pipe since I believe that has best compatibility. However, using read has performance and simplicity advantages.

ronag avatar Nov 11 '21 14:11 ronag

I don't know but I think at least https://github.com/nodejs/node/pull/40751#issuecomment-966357987 should work, even if stream.pipeline() uses readable.read().

lpinca avatar Nov 11 '21 15:11 lpinca

@lpinca the code you have pointed in https://github.com/nodejs/node/pull/40751#issuecomment-966357987 shows a more fundamental OOM inside our implementations that should like be fixed (or not).

mcollina avatar Nov 11 '21 15:11 mcollina

@lpinca the code you have pointed in https://github.com/nodejs/node/pull/40751#issuecomment-966357987 shows a more fundamental OOM inside our implementations that should like be fixed (or not).

FWIW the examples work (almost) as expected with the current implementation of stream.pipeline().

lpinca avatar Nov 11 '21 15:11 lpinca

Anyway, I have no real objections here.

lpinca avatar Nov 11 '21 15:11 lpinca

IMO

either we make pipeline use read or make async gen use pipe

ronag avatar Nov 11 '21 15:11 ronag

This crashes on my machine due to OOM.

@mcollina @lpinca I looked into this and the problem is that since everything is sync and it never ends it never gives back control to the event loop. Not sure how or if we should fix this. The only thing I can think of is that we have some kind of threshold that forces Writable.write to yield.

ronag avatar Nov 18 '21 17:11 ronag

Fwiw, I marked this semver-major because it’s a big breaking change to stop using .pipe() here – my understanding is that this breaks piping to multiple destinations completely, and even if not, this is missing pipe/unpipe events, it’s breaking manual src.emit('data') calls, etc. (not saying that people should rely on this – but reality is that they do).

either we make pipeline use read or make async gen use pipe

The latter sounds a lot safer to me.

addaleax avatar Nov 18 '21 17:11 addaleax

IMO if we merge this we are moving towards some from of deprecation of pipe.

If we don’t merge this we’re in this inconsistent state where sometimes we use pipe and sometimes not and the situation is quite unpredictable for our users.

I think we have to choose which api we recommend and stick with it in core at least.

readable is simpler pipe has better compat and maybe more feature rich in terms of multiple consumers?

@mcollina wdyt?

ronag avatar Nov 18 '21 18:11 ronag

CI: https://ci.nodejs.org/job/node-test-pull-request/41004/

nodejs-github-bot avatar Nov 19 '21 18:11 nodejs-github-bot

CITGM: https://ci.nodejs.org/view/Node.js-citgm/job/citgm-smoker/2793/

ronag avatar Nov 19 '21 18:11 ronag

CI: https://ci.nodejs.org/job/node-test-pull-request/41040/

nodejs-github-bot avatar Nov 22 '21 09:11 nodejs-github-bot

CI: https://ci.nodejs.org/job/node-test-pull-request/41159/

nodejs-github-bot avatar Nov 27 '21 17:11 nodejs-github-bot

This needs a rebase.

aduh95 avatar Apr 03 '22 16:04 aduh95

This needs a rebase, if we want to still merge it.

aduh95 avatar Sep 08 '22 15:09 aduh95