node
node copied to clipboard
stream: unref
Current WIP unref implementation that was talked about in:
- #46980
Cc @benjamingr, @ronag WDYT?
This would allow the following code:
const csvParsedStream = fs
.createReadStream('file.csv')
.compose(csvParse({ columns: false }));
const [columns] = await unref(csvParsedStream)
.take(1)
.toArray();
const parsed = await csvParsedStream.map((row) => parseRowByColumns(row, columns)).toArray();
Review requested:
- [ ] @nodejs/streams
I'm lost. What does this do and what problem does it solve? I don't follow the relation to https://github.com/nodejs/node/issues/46980.
The purpose for this is to allow using operators without closing the original stream, this started as the need to read only 1 line from a CSV file using take
and then use map
to iterate over the records, then @benjamingr said that this is the second time something like this is needed (destroyOnReturn
in readable iterator option was the first one) so let's create something generic
I'm very split on this. Not sure what to think atm.
I'm very split on this. Not sure what to think atm.
With the implementation or the feature?
I see why it happen here, thanks!
it('test memory', async () => {
const naturals = () => from(async function*() {
let i = 1;
while (true) {
yield i++;
await new Promise((resolve) => setTimeout(resolve, 1));
}
}());
const originalStream = pipeline(naturals(), new PassThrough(), () => {});
const unrefStream = unref(originalStream);
setInterval(() => {
const formatMemoryUsage = (data) => `${Math.round(data / 1024 / 1024 * 100) / 100} MB`;
const memoryData = process.memoryUsage();
const memoryUsage = {
rss: `${formatMemoryUsage(memoryData.rss)} -> Resident Set Size - total memory allocated for the process execution`,
heapTotal: `${formatMemoryUsage(memoryData.heapTotal)} -> total size of the allocated heap`,
heapUsed: `${formatMemoryUsage(memoryData.heapUsed)} -> actual memory used during the execution`,
external: `${formatMemoryUsage(memoryData.external)} -> V8 external memory`,
};
console.log(memoryUsage);
}, 200);
for await(const _ of originalStream) {
}
});
The solution for this is to implement what mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.
Maybe I'm missing something but I don't think it answer the need of this PR:
I can't find a way to make this work using the cloneable-readable because you must pipe all clones: Should continue consuming the original stream data from where the unref stopped
the unref one does this:
it('Should continue consuming the original stream data from where the unref stopped', async () => {
const originalStream = from([1, 2, 3, 4, 5]);
const firstItem = await unref(originalStream).take(1).toArray();
deepStrictEqual(firstItem, [1]);
const restOfData = await originalStream.toArray();
deepStrictEqual(restOfData, [2, 3, 4, 5]);
});
what is the equivalent in cloneable-readable
That was the original PR
Unref shouldn't be problematic in case we make it a proxy - no chunk saved
what PR?
what PR?
@mcollina #47023
I don't think there is any other way to implement this feature rather than that.
I'm really confused, so what to do with the original pr and this one?
@mcollina, @ronag @benjamingr
@mcollina the ask is for a more generic stream.iterator
which is "give me a copy of this stream but don't take ownership of it, so I can consume parts of it in a for await or with iterator helpers and it wouldn't close it".
Doing .take
without closing is a special case of doing it and so is doing stream.iterator.
Here are a few additional requirements as laid out in the various issues:
- no out-of-the-box memory leaks (no full stream accumulation if not consumed)
- take could be called after the stream start flowing
- multiple destinations should be possible, e.g. we can "take" a few bytes while the stream is piped to another downstream.
- no spec changes in .take()
These are a conflicting set of requirements. Given that the only objection to #47023 is about the an in-flux spec, the only solution I see is to add a nextN
accessor with the behavior we want.
The problem is that then you'd have to add a dropN
as well rather than have a solution for "I want to work with this stream and explicitly opt out of the destroying behavior of a method".
no out-of-the-box memory leaks (no full stream accumulation if not consumed) take could be called after the stream start flowing multiple destinations should be possible, e.g. we can "take" a few bytes while the stream is piped to another downstream.
That's doable, you basically can do:
function unref(stream) {
const unrefed = Object.create(stream); // so we have all the methdos
unrefed.destroy = () => {}; // but calling .destroy does nothing
// in a future iteration we can also keep track of them and notify of leaks: that is if .destroy is called
// on the unrefed stream but never on the source stream.
return unrefed;
}
(edit: and probably call the callback, and intentionally not _destroy)
I'm a bit scared of the possible side effects of that, but it might work.
@mcollina can you elaborate on the potential side effects so we know we're aware of all dangers when considering the feature?
(other than obviously well, explicitly opting out of being able to destroy a stream)
@mcollina can you elaborate on the potential side effects so we know we're aware of all dangers when considering the feature?
EventEmitter
store things attached to the object:
const EE = require('events');
const a = new EE()
a.foo = 'bar'
const b = Object.create(a)
b.foo = 'baz'
a.on('event', function () {
console.log(this.foo)
})
a.emit('event') // bar
b.emit('event') // baz
Oh you meant the fact it's doing prototypical inheritance? That was just an example I am content with any solution really for example - with a proxy that forwards all calls except destroy to the original stream and overrides just destroy.
I see the footgun with the Object.create implementation with regards to setters potentially though (and them not impacting the original). Would a proxy resolve that in your opinion?
I think the proxy solution would suffer from the same problem of the prototypical inheritance.
I don't understand why using the async iterator is problematic (minus the spec issue)
I think the proxy solution would suffer from the same problem of the prototypical inheritance.
Why? Setting a property would set it on the target (the stream) directly
I don't understand why using the async iterator is problematic (minus the spec issue)
You mean readable.iterator({ destroyOnReturn: false }).take(3)
(when that eventually lands)? The issue is that works on an async iterator so in order to get a stream back you would have to convert it to a readable again, which is very roundabout and footgunny on its own.
Why? Setting a property would set it on the target (the stream) directly
I think there would be some slight issue with instanceof inside events handlers.
You mean readable.iterator({ destroyOnReturn: false }).take(3) (when that eventually lands)? The issue is that works on an async iterator so in order to get a stream back you would have to convert it to a readable again, which is very roundabout and footgunny on its own.
That seems the one that have fewer footguns and potential issues down the road. It would also be the slowest one.
Anyhow, any implementation that are based on for await
or an active on('readable')
/read()
cycle would prevent the memory leak from happening.
Actually doing a proxy wouldn't work precisely because we need polymorphism for this to work (since the destroy call wouldn't go to the proxy). We need to combine both I guess.
I need to think more about this
tried something ignore the last commit
Unfortunately calling
unref()
on a stream could result on a out of memory error if the following conditions are met:
- the source stream is reading enough data that would not fit the current process
- the
unref
' stream is never consumed- the source stream is fully consumed
The result would be that all data is accumulated inside the
unref
stream.The solution for this is to implement what mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.
If anybody is willing to do the work, I'm happy for cloneable-readable to be included in Node.js core.
Actually I checked and you have the same memory leak if you use pipeline to other stream but never consume it and only consume the original stream, here's an example:
'use strict';
require('../common');
const {
Readable,
pipeline,
PassThrough
} = require('stream');
const { it } = require('node:test');
const { strictEqual } = require('assert');
const { from } = Readable;
it('make sure not leaking memory', async () => {
function getMemoryAllocatedInMB() {
return Math.round(process.memoryUsage().rss / 1024 / 1024 * 100) / 100;
}
const bigData = () => from(async function* () {
const obj = Array.from({ length: 100000 }, () => (Array.from({ length: 15 }, (_, i) => i)));
while (true) {
yield obj.map((item) => item.slice(0));
await new Promise((resolve) => setTimeout(resolve, 1));
}
}());
const originalStream = pipeline(bigData(), new PassThrough({ objectMode: true }), () => {
});
pipeline(originalStream, new PassThrough({ objectMode: true }), () => {
});
originalStream.iterator({ destroyOnReturn: true });
// Making sure some data passed so we won't catch something that is related to the infra
const iterator = originalStream.iterator({ destroyOnReturn: true });
for (let j = 0; j < 10; j++) {
await iterator.next();
}
const currentMemory = getMemoryAllocatedInMB();
for (let j = 0; j < 10; j++) {
await iterator.next();
}
const newMemory = getMemoryAllocatedInMB();
originalStream.destroy(null);
strictEqual(newMemory - currentMemory < 100, true, `After consuming 10 items the memory increased by ${Math.floor(newMemory - currentMemory)}MB`);
});
in that case, how is unref any different than pipeline?
Making it a proxy (not doing any buffering) is near impossible in the current design...
Ping @mcollina
Actually I checked and you have the same memory leak if you use pipeline to other stream but never consume it and only consume the original stream.
Your example is mixing on('data')
(used by .pipe()
and pipeline
) and on('readable')
events. on('readable')
takes precedence, meaning that the backpressure generated by the PassThrough
(via the false
value returned by .write()
) will be disregarded.
in that case, how is unref any different than pipeline?
Currently, if you consume your streams using only async iterators and the new APIs, you are shielded from all those problems. The whole goal of supporting unref()
(in this form) is to make forking the stream easier. I'm -1 in adding new hazards in streams, all these new APIs should be safe to use.
I think this is currently blocked on us not yet figuring out how to actually do it correctly. Both the proxy solution and the prototype solution don't work particularly well.