node
node copied to clipboard
stream: adjust src hwm when pipelining
@nodejs/startup @addaleax
CI: https://ci.nodejs.org/job/node-test-pull-request/40820/
@nodejs/streams
What happens if the source is already piped to another destination via source.pipe(destination)
? Does this allow the same source to be piped to multiple destinations?
Combining pipe and read is not a good idea... so no
What happens if the source is already piped to another destination via source.pipe(destination)? Does this allow the same source to be piped to multiple destinations?
It will work but the consumption will be driven by pipeline as the 'readable'
takes precedence.
@ronag could you add a test for this case?
It will work but the consumption will be driven by pipeline as the
'readable'
takes precedence.
If my understanding is correct, then consumption will be driven by the fastest destination, not the slowest, potentially messing up backpressure handling, right?
If my understanding is correct, then consumption will be driven by the fastest destination, not the slowest, potentially messing up backpressure handling, right?
No, not really. The same happens if you mix async iterators and .pipe()
. AsyncIterators use .read() and they drive the consumption of the source data.
Unfortuunately this is the only way .read()
can work reliably. See https://github.com/nodejs/node/pull/18994 and https://github.com/nodejs/node/issues/18058.
What happens if the source is already piped to another destination via source.pipe(destination)? Does this allow the same source to be piped to multiple destinations?
It will work but the consumption will be driven by pipeline as the
'readable'
takes precedence.@ronag could you add a test for this case?
I don't really see what a test here contributes? It uses an existing api.
'use strict';
const stream = require('stream');
const chunk = Buffer.alloc(1024);
function noop() {}
const source = new stream.Readable({
read() {
this.push(chunk);
}
});
let bytesWritten = 0;
const destination = new stream.Writable({
write(chunk, encoding, callback) {
bytesWritten += chunk.length;
callback();
}
});
stream.pipeline(source, destination, noop);
setInterval(function () {
console.log(bytesWritten);
}, 1000);
This crashes on my machine due to OOM.
And this is the point I was trying to make in https://github.com/nodejs/node/pull/40751#issuecomment-966241814
'use strict';
const stream = require('stream');
const chunk = Buffer.alloc(1024);
function noop() {}
const source = new stream.Readable({
read() {
this.push(chunk);
}
});
const slowDestination = new stream.Writable({
write(chunk, encoding, callback) {
setTimeout(callback, 1000);
}
});
const fastDestination = new stream.Writable({
write(chunk, encoding, callback) {
setImmediate(callback);
}
});
stream.pipeline(source, fastDestination, noop);
source.pipe(slowDestination);
setInterval(function () {
console.log(slowDestination.writableLength);
}, 1000);
but I understand that mixing readable.pipe()
and stream.pipeline()
is not a good idea and it is an issue even now if async iterators are used.
yea... it's not optimal. The problem is that we have 4 API's that are not strictly compatible:
- pause/resume/'data'/'end'
- pipe
- read/'readable'
- async gen
The question here is which API should pipeline use? I've previously recommended pipe since I believe that has best compatibility. However, using read has performance and simplicity advantages.
I don't know but I think at least https://github.com/nodejs/node/pull/40751#issuecomment-966357987 should work, even if stream.pipeline()
uses readable.read()
.
@lpinca the code you have pointed in https://github.com/nodejs/node/pull/40751#issuecomment-966357987 shows a more fundamental OOM inside our implementations that should like be fixed (or not).
@lpinca the code you have pointed in https://github.com/nodejs/node/pull/40751#issuecomment-966357987 shows a more fundamental OOM inside our implementations that should like be fixed (or not).
FWIW the examples work (almost) as expected with the current implementation of stream.pipeline()
.
Anyway, I have no real objections here.
IMO
either we make pipeline use read or make async gen use pipe
This crashes on my machine due to OOM.
@mcollina @lpinca I looked into this and the problem is that since everything is sync and it never ends it never gives back control to the event loop. Not sure how or if we should fix this. The only thing I can think of is that we have some kind of threshold that forces Writable.write
to yield.
Fwiw, I marked this semver-major because it’s a big breaking change to stop using .pipe()
here – my understanding is that this breaks piping to multiple destinations completely, and even if not, this is missing pipe
/unpipe
events, it’s breaking manual src.emit('data')
calls, etc. (not saying that people should rely on this – but reality is that they do).
either we make pipeline use read or make async gen use pipe
The latter sounds a lot safer to me.
IMO if we merge this we are moving towards some from of deprecation of pipe.
If we don’t merge this we’re in this inconsistent state where sometimes we use pipe and sometimes not and the situation is quite unpredictable for our users.
I think we have to choose which api we recommend and stick with it in core at least.
readable is simpler pipe has better compat and maybe more feature rich in terms of multiple consumers?
@mcollina wdyt?
CI: https://ci.nodejs.org/job/node-test-pull-request/41004/
CITGM: https://ci.nodejs.org/view/Node.js-citgm/job/citgm-smoker/2793/
CI: https://ci.nodejs.org/job/node-test-pull-request/41040/
CI: https://ci.nodejs.org/job/node-test-pull-request/41159/
This needs a rebase.
This needs a rebase, if we want to still merge it.