node icon indicating copy to clipboard operation
node copied to clipboard

stream: unref

Open rluvaton opened this issue 1 year ago • 31 comments

Current WIP unref implementation that was talked about in:

  • #46980

Cc @benjamingr, @ronag WDYT?


This would allow the following code:

const csvParsedStream = fs
	.createReadStream('file.csv')
	.compose(csvParse({ columns: false }));

const [columns] = await unref(csvParsedStream)
	.take(1)
	.toArray();

const parsed = await csvParsedStream.map((row) => parseRowByColumns(row, columns)).toArray();

rluvaton avatar May 14 '23 17:05 rluvaton

Review requested:

  • [ ] @nodejs/streams

nodejs-github-bot avatar May 14 '23 17:05 nodejs-github-bot

I'm lost. What does this do and what problem does it solve? I don't follow the relation to https://github.com/nodejs/node/issues/46980.

ronag avatar May 14 '23 18:05 ronag

The purpose for this is to allow using operators without closing the original stream, this started as the need to read only 1 line from a CSV file using take and then use map to iterate over the records, then @benjamingr said that this is the second time something like this is needed (destroyOnReturn in readable iterator option was the first one) so let's create something generic

rluvaton avatar May 14 '23 18:05 rluvaton

I'm very split on this. Not sure what to think atm.

ronag avatar May 14 '23 18:05 ronag

I'm very split on this. Not sure what to think atm.

With the implementation or the feature?

rluvaton avatar May 15 '23 06:05 rluvaton

I see why it happen here, thanks!

it('test memory', async () => {
  const naturals = () => from(async function*() {
    let i = 1;
    while (true) {
      yield i++;
      await new Promise((resolve) => setTimeout(resolve, 1));
    }
  }());
  const originalStream = pipeline(naturals(), new PassThrough(), () => {});
  const unrefStream = unref(originalStream);

  setInterval(() => {
    const formatMemoryUsage = (data) => `${Math.round(data / 1024 / 1024 * 100) / 100} MB`;

    const memoryData = process.memoryUsage();

    const memoryUsage = {
      rss: `${formatMemoryUsage(memoryData.rss)} -> Resident Set Size - total memory allocated for the process execution`,
      heapTotal: `${formatMemoryUsage(memoryData.heapTotal)} -> total size of the allocated heap`,
      heapUsed: `${formatMemoryUsage(memoryData.heapUsed)} -> actual memory used during the execution`,
      external: `${formatMemoryUsage(memoryData.external)} -> V8 external memory`,
    };

    console.log(memoryUsage);
  }, 200);
  for await(const _ of originalStream) {
  }
});

rluvaton avatar May 15 '23 09:05 rluvaton

The solution for this is to implement what mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.

Maybe I'm missing something but I don't think it answer the need of this PR:

I can't find a way to make this work using the cloneable-readable because you must pipe all clones: Should continue consuming the original stream data from where the unref stopped

the unref one does this:

it('Should continue consuming the original stream data from where the unref stopped', async () => {
  const originalStream = from([1, 2, 3, 4, 5]);

  const firstItem = await unref(originalStream).take(1).toArray();
  deepStrictEqual(firstItem, [1]);

  const restOfData = await originalStream.toArray();
  deepStrictEqual(restOfData, [2, 3, 4, 5]);
});

what is the equivalent in cloneable-readable

rluvaton avatar May 19 '23 09:05 rluvaton

That was the original PR

Unref shouldn't be problematic in case we make it a proxy - no chunk saved

rluvaton avatar May 19 '23 15:05 rluvaton

what PR?

mcollina avatar May 20 '23 08:05 mcollina

what PR?

@mcollina #47023

rluvaton avatar May 20 '23 17:05 rluvaton

I don't think there is any other way to implement this feature rather than that.

mcollina avatar May 20 '23 22:05 mcollina

I'm really confused, so what to do with the original pr and this one?

@mcollina, @ronag @benjamingr

rluvaton avatar May 22 '23 04:05 rluvaton

@mcollina the ask is for a more generic stream.iterator which is "give me a copy of this stream but don't take ownership of it, so I can consume parts of it in a for await or with iterator helpers and it wouldn't close it".

Doing .take without closing is a special case of doing it and so is doing stream.iterator.

benjamingr avatar May 22 '23 08:05 benjamingr

Here are a few additional requirements as laid out in the various issues:

  1. no out-of-the-box memory leaks (no full stream accumulation if not consumed)
  2. take could be called after the stream start flowing
  3. multiple destinations should be possible, e.g. we can "take" a few bytes while the stream is piped to another downstream.
  4. no spec changes in .take()

These are a conflicting set of requirements. Given that the only objection to #47023 is about the an in-flux spec, the only solution I see is to add a nextN accessor with the behavior we want.

mcollina avatar May 22 '23 08:05 mcollina

The problem is that then you'd have to add a dropN as well rather than have a solution for "I want to work with this stream and explicitly opt out of the destroying behavior of a method".

no out-of-the-box memory leaks (no full stream accumulation if not consumed) take could be called after the stream start flowing multiple destinations should be possible, e.g. we can "take" a few bytes while the stream is piped to another downstream.

That's doable, you basically can do:

function unref(stream) {
  const unrefed = Object.create(stream); // so we have all the methdos
  unrefed.destroy = () => {}; // but calling .destroy does nothing
  // in a future iteration we can also keep track of them and notify of leaks: that is if .destroy is called
  // on the unrefed stream but never on the source stream. 
  return unrefed;
}

(edit: and probably call the callback, and intentionally not _destroy)

benjamingr avatar May 22 '23 17:05 benjamingr

I'm a bit scared of the possible side effects of that, but it might work.

mcollina avatar May 22 '23 19:05 mcollina

@mcollina can you elaborate on the potential side effects so we know we're aware of all dangers when considering the feature?

(other than obviously well, explicitly opting out of being able to destroy a stream)

benjamingr avatar May 22 '23 19:05 benjamingr

@mcollina can you elaborate on the potential side effects so we know we're aware of all dangers when considering the feature?

EventEmitter store things attached to the object:

const EE = require('events');

const a = new EE()
a.foo = 'bar'

const b = Object.create(a)
b.foo = 'baz'

a.on('event', function () {
  console.log(this.foo)
})

a.emit('event') // bar
b.emit('event') // baz

mcollina avatar May 23 '23 16:05 mcollina

Oh you meant the fact it's doing prototypical inheritance? That was just an example I am content with any solution really for example - with a proxy that forwards all calls except destroy to the original stream and overrides just destroy.

I see the footgun with the Object.create implementation with regards to setters potentially though (and them not impacting the original). Would a proxy resolve that in your opinion?

benjamingr avatar May 23 '23 17:05 benjamingr

I think the proxy solution would suffer from the same problem of the prototypical inheritance.

I don't understand why using the async iterator is problematic (minus the spec issue)

mcollina avatar May 23 '23 17:05 mcollina

I think the proxy solution would suffer from the same problem of the prototypical inheritance.

Why? Setting a property would set it on the target (the stream) directly

I don't understand why using the async iterator is problematic (minus the spec issue)

You mean readable.iterator({ destroyOnReturn: false }).take(3) (when that eventually lands)? The issue is that works on an async iterator so in order to get a stream back you would have to convert it to a readable again, which is very roundabout and footgunny on its own.

benjamingr avatar May 23 '23 22:05 benjamingr

Why? Setting a property would set it on the target (the stream) directly

I think there would be some slight issue with instanceof inside events handlers.

You mean readable.iterator({ destroyOnReturn: false }).take(3) (when that eventually lands)? The issue is that works on an async iterator so in order to get a stream back you would have to convert it to a readable again, which is very roundabout and footgunny on its own.

That seems the one that have fewer footguns and potential issues down the road. It would also be the slowest one. Anyhow, any implementation that are based on for await or an active on('readable')/read() cycle would prevent the memory leak from happening.

mcollina avatar May 24 '23 13:05 mcollina

Actually doing a proxy wouldn't work precisely because we need polymorphism for this to work (since the destroy call wouldn't go to the proxy). We need to combine both I guess.

benjamingr avatar May 24 '23 13:05 benjamingr

I need to think more about this

benjamingr avatar May 24 '23 13:05 benjamingr

tried something ignore the last commit

rluvaton avatar Jun 09 '23 13:06 rluvaton

Unfortunately calling unref() on a stream could result on a out of memory error if the following conditions are met:

  1. the source stream is reading enough data that would not fit the current process
  2. the unref' stream is never consumed
  3. the source stream is fully consumed

The result would be that all data is accumulated inside the unref stream.

The solution for this is to implement what mcollina/cloneable-readable does: the stream is consumed at the speed of its slowest consumer.

If anybody is willing to do the work, I'm happy for cloneable-readable to be included in Node.js core.

Actually I checked and you have the same memory leak if you use pipeline to other stream but never consume it and only consume the original stream, here's an example:

'use strict';
require('../common');

const {
  Readable,
  pipeline,
  PassThrough
} = require('stream');
const { it } = require('node:test');
const { strictEqual } = require('assert');
const { from } = Readable;

it('make sure not leaking memory', async () => {
  function getMemoryAllocatedInMB() {
    return Math.round(process.memoryUsage().rss / 1024 / 1024 * 100) / 100;
  }

  const bigData = () => from(async function* () {
    const obj = Array.from({ length: 100000 }, () => (Array.from({ length: 15 }, (_, i) => i)));
    while (true) {
      yield obj.map((item) => item.slice(0));
      await new Promise((resolve) => setTimeout(resolve, 1));
    }
  }());

  const originalStream = pipeline(bigData(), new PassThrough({ objectMode: true }), () => {
  });
  pipeline(originalStream, new PassThrough({ objectMode: true }), () => {
  });
  originalStream.iterator({ destroyOnReturn: true });

  // Making sure some data passed so we won't catch something that is related to the infra
  const iterator = originalStream.iterator({ destroyOnReturn: true });
  for (let j = 0; j < 10; j++) {
    await iterator.next();
  }

  const currentMemory = getMemoryAllocatedInMB();

  for (let j = 0; j < 10; j++) {
    await iterator.next();
  }

  const newMemory = getMemoryAllocatedInMB();

  originalStream.destroy(null);
  strictEqual(newMemory - currentMemory < 100, true, `After consuming 10 items the memory increased by ${Math.floor(newMemory - currentMemory)}MB`);
});

in that case, how is unref any different than pipeline?

rluvaton avatar Jun 09 '23 16:06 rluvaton

Making it a proxy (not doing any buffering) is near impossible in the current design...

rluvaton avatar Jun 10 '23 21:06 rluvaton

Ping @mcollina

rluvaton avatar Jun 16 '23 09:06 rluvaton

Actually I checked and you have the same memory leak if you use pipeline to other stream but never consume it and only consume the original stream.

Your example is mixing on('data') (used by .pipe() and pipeline) and on('readable') events. on('readable') takes precedence, meaning that the backpressure generated by the PassThrough (via the false value returned by .write()) will be disregarded.

in that case, how is unref any different than pipeline?

Currently, if you consume your streams using only async iterators and the new APIs, you are shielded from all those problems. The whole goal of supporting unref() (in this form) is to make forking the stream easier. I'm -1 in adding new hazards in streams, all these new APIs should be safe to use.

mcollina avatar Jun 16 '23 10:06 mcollina

I think this is currently blocked on us not yet figuring out how to actually do it correctly. Both the proxy solution and the prototype solution don't work particularly well.

benjamingr avatar Jun 22 '23 12:06 benjamingr