node-unzipper icon indicating copy to clipboard operation
node-unzipper copied to clipboard

Async iteration 2

Open alubbe opened this issue 4 years ago • 15 comments

This issue is a follow-up to https://github.com/ZJONSSON/node-unzipper/issues/178

We've noticed some weird behaviour around async iteration with forceStream. This works as expected:

  const stream = fs.createReadStream('test.zip');
  const zip = unzip.Parse({forceStream: true});
  const pipedZipStream = stream.pipe(zip);
  for await (const entry of pipedZipStream) {
    console.log(entry.path); // prints everything as expected
  }

But if you do anything async in the for loop, it stops working:

  const stream = fs.createReadStream('test.zip');
  const zip = unzip.Parse({forceStream: true});
  const pipedZipStream = stream.pipe(zip);
  for await (const entry of pipedZipStream) {
    console.log(entry.path); // prints the first entry path, but none of the others
    await new Promise(resolve=>setTimeout(resolve, 1000))
  }

However, everything works great if just pipe zip through another PassThrough:

  const stream = fs.createReadStream('test.zip');
  const zip = unzip.Parse({forceStream: true});
  const pipedZipStream = stream.pipe(zip);
  for await (const entry of pipedZipStream.pipe(new PassThrough({objectMode: true}))) {
    console.log(entry.path); // prints everything as expected
    await new Promise(resolve=>setTimeout(resolve, 1000))
  }

I assume this is because either the data events are no longer emitted or noone is listening. Since this library is using a custom PullStream, I wanted to ask why this isn't working without a PassThrough and if there's anywhere in the code that we could fix that?

alubbe avatar Apr 08 '20 12:04 alubbe

I will take a closer look here. I haven't played with async iterators and streams so will try to replicate.

Worth noting that a big aspect of the Parse method is that there is no random access, and you have to scroll through the entire file. In that scenario managing backpressure becomes important (in case you have slow consumer, you don't want to blow up memory). So Parse needs the underlying files be "fully consumed" before reading the next entry. On entry there is a method .autodrain() that basically consumes the entry if you are no interested in it.

The Open methods are way more flexible, since the use the central directory and allow you to only consume the files you are interested in (even in parallel)

ZJONSSON avatar Apr 08 '20 20:04 ZJONSSON

Open does look quite awesome, but exceljs has too many assumptions around streams and piping that we probably have to stick with Parse for now. I came up with this idea, where after each iteration (which may not have consumed the whole entry), we call entry.autodrain() just in case - what do you think of it?

  const {once} = require('events');
  
  async function* iterateZip(stream) {
    const zip = unzip.Parse();
    stream.pipe(zip);

    const entries = [];
    zip.on('entry', entry => entries.push(entry));

    let finished = false;
    zip.on('finish', () => (finished = true));
    const zipFinishedPromise = once(zip, 'finish');

    while (!finished || entries.length > 0) {
      if (entries.length === 0) {
        await Promise.race([once(zip, 'entry'), zipFinishedPromise]);
      } else {
        const entry = entries.shift();
        yield entry;
        entry.autodrain();
      }
    }
  }

  for await (const entry of iterateZip(fs.createReadStream('test.zip'))) {
    console.log(entry.path);
    await new Promise(resolve=>setTimeout(resolve, 1000))
  }

alubbe avatar Apr 14 '20 14:04 alubbe

Thanks @alubbe I think if you don't await on the autodrain (as implemented above) you "should" be fine, except you "might" have a dangling promise unresolved. If you have already "consumed" the entry, the finish event might have been emitted already when you hit autodrain. Since autodrain is only resolved with the finish event from entry, it might never resolve.

Might be worth expanding autodrain to check if the entry has already been fully consumed (in which case autodrain should be a noop). We might have either inspect the entry stream to see if finished already or set a variable on entry to register a stream being finished. Let me know what you think

ZJONSSON avatar Apr 14 '20 16:04 ZJONSSON

I investigated that and it seems that the finish event is called even in the case you described. I added a new test case to confirm it and ran it on node v8 and node v13 (https://github.com/ZJONSSON/node-unzipper/pull/194). And in this particular case, I'm fine with just piping the entry into the NoopStream without waiting for it, so we don't need await and I don't think anything is leaked. What do you think?

alubbe avatar Apr 15 '20 05:04 alubbe

Yes you are spot on I think. If we pipe a finished stream to a new transform, we will get the finish event in the transform.

ZJONSSON avatar Apr 20 '20 12:04 ZJONSSON

okay then we should probably remove the async iteration example from the README :( Is there any chance of integrating my solution from above into this code base? That would mean that node v10+ is required. We could also put it into a special file that the main export doesn't import.

alubbe avatar Apr 20 '20 14:04 alubbe

Thanks @alubbe. Why do you say the async iteration example from README doesn't work? This example explicitly either consumes the file or runs autodrain (in both cases each entry is fully consumed).

unzipper is a drop-in replacement for unzip so I try to resist breaking backwards compatibility. That being said we will have to do that eventually. I like the idea of a special file outside of the main exports - do you want to whip up a PR?

ZJONSSON avatar Apr 20 '20 14:04 ZJONSSON

you can try it out yourself:

  const stream = fs.createReadStream('test.zip');
  const zip = unzip.Parse({forceStream: true});
  const pipedZipStream = stream.pipe(zip);
  for await (const entry of pipedZipStream) {
    console.log(entry.path); // prints the first entry path, but none of the others
    await new Promise(resolve=>setTimeout(resolve, 1000))
  }

will not work :(

But I've written a helper file for exceljs that was just merged and published - I can open a PR bringing it to this repo, as well, as a separate export

alubbe avatar Jun 05 '20 12:06 alubbe

This example is missing an .autodrain() on any entry you want to skip. Because the zip file is being read sequentially, the only way to go from one entry to the next is to consume the content of each entry. The entry can be consumed by either piping it to a transform/writable stream or pipe it to nothing, which is exactly what autodrain does - see: https://github.com/ZJONSSON/node-unzipper/blob/724e71d296a6dd0cd00edfb71dffc622bdfba5f7/lib/NoopStream.js#L17

ZJONSSON avatar Jun 05 '20 14:06 ZJONSSON

Yes, my bad, I have not posting all of my findings here. This code here also doesn't work - can you see a way of fixing it?

(async () => {
  const stream = fs.createReadStream('test.zip');
  const zip = unzip.Parse({forceStream: true});
  stream.pipe(zip);

  for await (const entry of zip) {
    console.log(entry.path);
    await entry.autodrain().promise();
    await new Promise(resolve=>setTimeout(resolve, 1000));
  }
})()

alubbe avatar Jun 08 '20 09:06 alubbe

HI! I'm also having an issue with async iteration.

I've created a minimal example here.

The TL;DR is I'm doing something like this:

  const zip = fs.createReadStream(zipFile).pipe(unzipper.Parse({ forceStream: true }));

  for await (const entry of zip) {
    const entryContent = await entry.buffer();
    const someOtherFileContent = await fs.promises.readFile(someOtherFile);
    // Do something with the above butters here
  }

This loop only ever executes once, for the first entry. Whereas changing the second line in the loop to const someOtherFileContent = fs.readFileSync(someOtherFile); fixes the issue and loops over every entry in the zip.

daiscog avatar Jul 04 '20 10:07 daiscog

Thank you @alubbe and @daiscog. This library was not originally designed with async iterators in mind. It seemed sufficient to add the forceStream option to get async-iterators to work, but clearly there are some additional considerations.

I will continue investigating but appreciate any help. I added a test case to async-iterator-test branch https://github.com/ZJONSSON/node-unzipper/blob/async-iterator-test/test/async-iterator.js

See also the following example that demonstrates that the vanilla streams should be able to handle awaits inside the iterator:

const {Transform, Writable, PassThrough} = require('stream');
const wait = async ms => new Promise(resolve => setTimeout(resolve,ms));

async function main() {
  const source = new PassThrough({objectMode: true});
  const transform = new Transform({
    objectMode: true,
    transform: async function(d,e,cb) {
      await wait(100);
      this.push(d);
      cb();
    }
  });

  source.pipe(transform);
  source.write(1);
  source.write(2);
  source.end(3);

  for await (const entry of transform) {
    await wait(200);
    console.log(entry);
  }
}

main().then(console.log,console.log);

ZJONSSON avatar Jul 04 '20 14:07 ZJONSSON

We solved it by wrapping everything in an async generator. In general, I much prefer async generators to streams, I find them more natural and readable. But since this library is based on streams, here is a way of converting them that I built for exceljs - maybe I should spin this out into an npm package:

module.exports = async function* iterateStream(stream) {
  const contents = [];
  stream.on('data', data => contents.push(data));

  let resolveStreamEndedPromise;
  const streamEndedPromise = new Promise(resolve => (resolveStreamEndedPromise = resolve));

  let ended = false;
  stream.on('end', () => {
    ended = true;
    resolveStreamEndedPromise();
  });

  let error = false;
  stream.on('error', err => {
    error = err;
    resolveStreamEndedPromise();
  });

  while (!ended || contents.length > 0) {
    if (contents.length === 0) {
      stream.resume();
      // eslint-disable-next-line no-await-in-loop
      await Promise.race([once(stream, 'data'), streamEndedPromise]);
    } else {
      stream.pause();
      const data = contents.shift();
      yield data;
    }
    if (error) throw error;
  }
  resolveStreamEndedPromise();
};

function once(eventEmitter, type) {
  // TODO: Use require('events').once when node v10 is dropped
  return new Promise(resolve => {
    let fired = false;
    const handler = () => {
      if (!fired) {
        fired = true;
        eventEmitter.removeListener(type, handler);
        resolve();
      }
    };
    eventEmitter.addListener(type, handler);
  });
}

And then all of the above examples work by replacing for await (const entry of zip) { with for await (const entry of iterateStream(zip)) { - even with autodrain:

(async () => {
  const stream = fs.createReadStream('test.zip');
  const zip = unzip.Parse({forceStream: true});
  stream.pipe(zip);

  for await (const entry of iterateStream(zip)) {
    console.log(entry.path);
    await new Promise(resolve=>setTimeout(resolve, 1000));
  }
})()

alubbe avatar Jul 06 '20 05:07 alubbe

@alubbe You made my code much more readable, thank you a lot! I converted it to TypeScript:

import type { Duplex } from "stream";
import type { EventEmitter } from "events";

export async function* iterateStream<T = unknown>(stream: Duplex) {
  const contents: T[] = [];
  stream.on("data", (data) => contents.push(data));

  let resolveStreamEndedPromise: () => void = () => undefined;
  const streamEndedPromise = new Promise(
    (resolve) => (resolveStreamEndedPromise = resolve)
  );

  let hasEnded = false;
  stream.on("end", () => {
    hasEnded = true;
    resolveStreamEndedPromise();
  });

  let error: boolean | Error = false;
  stream.on("error", (err) => {
    error = err;
    resolveStreamEndedPromise();
  });

  while (!hasEnded || contents.length > 0) {
    if (contents.length === 0) {
      stream.resume();
      await Promise.race([once(stream, "data"), streamEndedPromise]);
    } else {
      stream.pause();
      const data = contents.shift();
      if (data) {
        yield data;
      }
    }
    if (error) throw error;
  }
  resolveStreamEndedPromise();
}

const once = (eventEmitter: EventEmitter, type: string) => {
  return new Promise((resolve) => {
    eventEmitter.once(type, resolve);
  });
};

I was also wondering whether there could be a potential memory leak from the Promise returned by once not resolving and therefore dangling around?

I added this cleanup code to ensure the promise actually resolves:

import type { Duplex } from "stream";
import type { EventEmitter } from "events";
import once from "lodash/once";

export async function* iterateStream<T = unknown>(stream: Duplex) {
  const contents: T[] = [];
  stream.on("data", (data) => contents.push(data));

  let resolveStreamEndedPromise: () => void = () => undefined;
  const streamEndedPromise = new Promise(
    (resolve) => (resolveStreamEndedPromise = resolve)
  );

  let hasEnded = false;
  stream.on("end", () => {
    hasEnded = true;
    resolveStreamEndedPromise();
  });

  let error: boolean | Error = false;
  stream.on("error", (err) => {
    error = err;
    resolveStreamEndedPromise();
  });

  while (!hasEnded || contents.length > 0) {
    if (contents.length === 0) {
      stream.resume();
      const waitForEmit = waitOnceEmitted(stream, "data");
      await Promise.race([waitForEmit.done, streamEndedPromise]);
      waitForEmit.destroy();
    } else {
      stream.pause();
      const data = contents.shift();
      if (data) {
        yield data;
      }
    }
    if (error) throw error;
  }
  resolveStreamEndedPromise();
}

const waitOnceEmitted = (eventEmitter: EventEmitter, type: string) => {
  let destroy: () => void = () => undefined;
  return {
    done: new Promise((_resolve) => {
      const resolve = once(_resolve);
      destroy = () => {
        resolve();
        eventEmitter.off(type, resolve);
      };
      eventEmitter.on(type, resolve);
    }),
    cleanup: () => destroy(),
  };
};

n1ru4l avatar Jul 08 '20 21:07 n1ru4l

@alubbe looks like I follow your steps and collecting same rakes starting from saxes. Thank you for publishing results of your efforts.

This time I just created a global array const promises: Promise<void> = []; to collect all promises generated from .on('entry', ... async handler. And after all just added await Promise.all(promises);. It's not a big deal in my case because I save data to the Google Storage and it performs rather fast. But this misleading behavior will definitely lead to unintuitive results in other cases and needs to be fixed.

@ZJONSSON you can use Emittery (https://github.com/sindresorhus/emittery) with promises and iterators support under the hood to implement events generation compatible with async handlers.

Also I'd like to recommend this article about microtasks and macrotasks to get things clear. And try to avoid mixing promises with events (if these events are not surrogates but native NodeJS events). Hope these considerations will help to jump out from the NodeJS Event Loop hell.

dobromyslov avatar Feb 01 '21 21:02 dobromyslov