node-unzipper
node-unzipper copied to clipboard
Async iteration 2
This issue is a follow-up to https://github.com/ZJONSSON/node-unzipper/issues/178
We've noticed some weird behaviour around async iteration with forceStream
. This works as expected:
const stream = fs.createReadStream('test.zip');
const zip = unzip.Parse({forceStream: true});
const pipedZipStream = stream.pipe(zip);
for await (const entry of pipedZipStream) {
console.log(entry.path); // prints everything as expected
}
But if you do anything async in the for loop, it stops working:
const stream = fs.createReadStream('test.zip');
const zip = unzip.Parse({forceStream: true});
const pipedZipStream = stream.pipe(zip);
for await (const entry of pipedZipStream) {
console.log(entry.path); // prints the first entry path, but none of the others
await new Promise(resolve=>setTimeout(resolve, 1000))
}
However, everything works great if just pipe zip
through another PassThrough
:
const stream = fs.createReadStream('test.zip');
const zip = unzip.Parse({forceStream: true});
const pipedZipStream = stream.pipe(zip);
for await (const entry of pipedZipStream.pipe(new PassThrough({objectMode: true}))) {
console.log(entry.path); // prints everything as expected
await new Promise(resolve=>setTimeout(resolve, 1000))
}
I assume this is because either the data
events are no longer emitted or noone is listening. Since this library is using a custom PullStream
, I wanted to ask why this isn't working without a PassThrough and if there's anywhere in the code that we could fix that?
I will take a closer look here. I haven't played with async iterators and streams so will try to replicate.
Worth noting that a big aspect of the Parse
method is that there is no random access, and you have to scroll through the entire file. In that scenario managing backpressure becomes important (in case you have slow consumer, you don't want to blow up memory). So Parse
needs the underlying files be "fully consumed" before reading the next entry. On entry
there is a method .autodrain()
that basically consumes the entry if you are no interested in it.
The Open
methods are way more flexible, since the use the central directory and allow you to only consume the files you are interested in (even in parallel)
Open
does look quite awesome, but exceljs has too many assumptions around streams and piping that we probably have to stick with Parse
for now. I came up with this idea, where after each iteration (which may not have consumed the whole entry
), we call entry.autodrain()
just in case - what do you think of it?
const {once} = require('events');
async function* iterateZip(stream) {
const zip = unzip.Parse();
stream.pipe(zip);
const entries = [];
zip.on('entry', entry => entries.push(entry));
let finished = false;
zip.on('finish', () => (finished = true));
const zipFinishedPromise = once(zip, 'finish');
while (!finished || entries.length > 0) {
if (entries.length === 0) {
await Promise.race([once(zip, 'entry'), zipFinishedPromise]);
} else {
const entry = entries.shift();
yield entry;
entry.autodrain();
}
}
}
for await (const entry of iterateZip(fs.createReadStream('test.zip'))) {
console.log(entry.path);
await new Promise(resolve=>setTimeout(resolve, 1000))
}
Thanks @alubbe I think if you don't await
on the autodrain (as implemented above) you "should" be fine, except you "might" have a dangling promise unresolved. If you have already "consumed" the entry, the finish
event might have been emitted already when you hit autodrain
. Since autodrain is only resolved with the finish
event from entry, it might never resolve.
Might be worth expanding autodrain to check if the entry has already been fully consumed (in which case autodrain should be a noop). We might have either inspect the entry stream to see if finished already or set a variable on entry to register a stream being finished. Let me know what you think
I investigated that and it seems that the finish
event is called even in the case you described. I added a new test case to confirm it and ran it on node v8 and node v13 (https://github.com/ZJONSSON/node-unzipper/pull/194). And in this particular case, I'm fine with just piping the entry
into the NoopStream
without waiting for it, so we don't need await and I don't think anything is leaked. What do you think?
Yes you are spot on I think. If we pipe a finished stream to a new transform, we will get the finish event in the transform.
okay then we should probably remove the async iteration example from the README :( Is there any chance of integrating my solution from above into this code base? That would mean that node v10+ is required. We could also put it into a special file that the main export doesn't import.
Thanks @alubbe. Why do you say the async iteration example from README doesn't work? This example explicitly either consumes the file or runs autodrain (in both cases each entry is fully consumed).
unzipper
is a drop-in replacement for unzip
so I try to resist breaking backwards compatibility. That being said we will have to do that eventually. I like the idea of a special file outside of the main exports - do you want to whip up a PR?
you can try it out yourself:
const stream = fs.createReadStream('test.zip');
const zip = unzip.Parse({forceStream: true});
const pipedZipStream = stream.pipe(zip);
for await (const entry of pipedZipStream) {
console.log(entry.path); // prints the first entry path, but none of the others
await new Promise(resolve=>setTimeout(resolve, 1000))
}
will not work :(
But I've written a helper file for exceljs that was just merged and published - I can open a PR bringing it to this repo, as well, as a separate export
This example is missing an .autodrain()
on any entry you want to skip. Because the zip file is being read sequentially, the only way to go from one entry to the next is to consume the content of each entry. The entry can be consumed by either piping it to a transform/writable stream or pipe it to nothing, which is exactly what autodrain does - see: https://github.com/ZJONSSON/node-unzipper/blob/724e71d296a6dd0cd00edfb71dffc622bdfba5f7/lib/NoopStream.js#L17
Yes, my bad, I have not posting all of my findings here. This code here also doesn't work - can you see a way of fixing it?
(async () => {
const stream = fs.createReadStream('test.zip');
const zip = unzip.Parse({forceStream: true});
stream.pipe(zip);
for await (const entry of zip) {
console.log(entry.path);
await entry.autodrain().promise();
await new Promise(resolve=>setTimeout(resolve, 1000));
}
})()
HI! I'm also having an issue with async iteration.
I've created a minimal example here.
The TL;DR is I'm doing something like this:
const zip = fs.createReadStream(zipFile).pipe(unzipper.Parse({ forceStream: true }));
for await (const entry of zip) {
const entryContent = await entry.buffer();
const someOtherFileContent = await fs.promises.readFile(someOtherFile);
// Do something with the above butters here
}
This loop only ever executes once, for the first entry. Whereas changing the second line in the loop to const someOtherFileContent = fs.readFileSync(someOtherFile);
fixes the issue and loops over every entry in the zip.
Thank you @alubbe and @daiscog. This library was not originally designed with async iterators in mind. It seemed sufficient to add the forceStream
option to get async-iterators to work, but clearly there are some additional considerations.
I will continue investigating but appreciate any help. I added a test case to async-iterator-test
branch https://github.com/ZJONSSON/node-unzipper/blob/async-iterator-test/test/async-iterator.js
See also the following example that demonstrates that the vanilla streams should be able to handle awaits inside the iterator:
const {Transform, Writable, PassThrough} = require('stream');
const wait = async ms => new Promise(resolve => setTimeout(resolve,ms));
async function main() {
const source = new PassThrough({objectMode: true});
const transform = new Transform({
objectMode: true,
transform: async function(d,e,cb) {
await wait(100);
this.push(d);
cb();
}
});
source.pipe(transform);
source.write(1);
source.write(2);
source.end(3);
for await (const entry of transform) {
await wait(200);
console.log(entry);
}
}
main().then(console.log,console.log);
We solved it by wrapping everything in an async generator. In general, I much prefer async generators to streams, I find them more natural and readable. But since this library is based on streams, here is a way of converting them that I built for exceljs - maybe I should spin this out into an npm package:
module.exports = async function* iterateStream(stream) {
const contents = [];
stream.on('data', data => contents.push(data));
let resolveStreamEndedPromise;
const streamEndedPromise = new Promise(resolve => (resolveStreamEndedPromise = resolve));
let ended = false;
stream.on('end', () => {
ended = true;
resolveStreamEndedPromise();
});
let error = false;
stream.on('error', err => {
error = err;
resolveStreamEndedPromise();
});
while (!ended || contents.length > 0) {
if (contents.length === 0) {
stream.resume();
// eslint-disable-next-line no-await-in-loop
await Promise.race([once(stream, 'data'), streamEndedPromise]);
} else {
stream.pause();
const data = contents.shift();
yield data;
}
if (error) throw error;
}
resolveStreamEndedPromise();
};
function once(eventEmitter, type) {
// TODO: Use require('events').once when node v10 is dropped
return new Promise(resolve => {
let fired = false;
const handler = () => {
if (!fired) {
fired = true;
eventEmitter.removeListener(type, handler);
resolve();
}
};
eventEmitter.addListener(type, handler);
});
}
And then all of the above examples work by replacing for await (const entry of zip) {
with for await (const entry of iterateStream(zip)) {
- even with autodrain:
(async () => {
const stream = fs.createReadStream('test.zip');
const zip = unzip.Parse({forceStream: true});
stream.pipe(zip);
for await (const entry of iterateStream(zip)) {
console.log(entry.path);
await new Promise(resolve=>setTimeout(resolve, 1000));
}
})()
@alubbe You made my code much more readable, thank you a lot! I converted it to TypeScript:
import type { Duplex } from "stream";
import type { EventEmitter } from "events";
export async function* iterateStream<T = unknown>(stream: Duplex) {
const contents: T[] = [];
stream.on("data", (data) => contents.push(data));
let resolveStreamEndedPromise: () => void = () => undefined;
const streamEndedPromise = new Promise(
(resolve) => (resolveStreamEndedPromise = resolve)
);
let hasEnded = false;
stream.on("end", () => {
hasEnded = true;
resolveStreamEndedPromise();
});
let error: boolean | Error = false;
stream.on("error", (err) => {
error = err;
resolveStreamEndedPromise();
});
while (!hasEnded || contents.length > 0) {
if (contents.length === 0) {
stream.resume();
await Promise.race([once(stream, "data"), streamEndedPromise]);
} else {
stream.pause();
const data = contents.shift();
if (data) {
yield data;
}
}
if (error) throw error;
}
resolveStreamEndedPromise();
}
const once = (eventEmitter: EventEmitter, type: string) => {
return new Promise((resolve) => {
eventEmitter.once(type, resolve);
});
};
I was also wondering whether there could be a potential memory leak from the Promise returned by once not resolving and therefore dangling around?
I added this cleanup code to ensure the promise actually resolves:
import type { Duplex } from "stream";
import type { EventEmitter } from "events";
import once from "lodash/once";
export async function* iterateStream<T = unknown>(stream: Duplex) {
const contents: T[] = [];
stream.on("data", (data) => contents.push(data));
let resolveStreamEndedPromise: () => void = () => undefined;
const streamEndedPromise = new Promise(
(resolve) => (resolveStreamEndedPromise = resolve)
);
let hasEnded = false;
stream.on("end", () => {
hasEnded = true;
resolveStreamEndedPromise();
});
let error: boolean | Error = false;
stream.on("error", (err) => {
error = err;
resolveStreamEndedPromise();
});
while (!hasEnded || contents.length > 0) {
if (contents.length === 0) {
stream.resume();
const waitForEmit = waitOnceEmitted(stream, "data");
await Promise.race([waitForEmit.done, streamEndedPromise]);
waitForEmit.destroy();
} else {
stream.pause();
const data = contents.shift();
if (data) {
yield data;
}
}
if (error) throw error;
}
resolveStreamEndedPromise();
}
const waitOnceEmitted = (eventEmitter: EventEmitter, type: string) => {
let destroy: () => void = () => undefined;
return {
done: new Promise((_resolve) => {
const resolve = once(_resolve);
destroy = () => {
resolve();
eventEmitter.off(type, resolve);
};
eventEmitter.on(type, resolve);
}),
cleanup: () => destroy(),
};
};
@alubbe looks like I follow your steps and collecting same rakes starting from saxes
. Thank you for publishing results of your efforts.
This time I just created a global array const promises: Promise<void> = [];
to collect all promises generated from .on('entry', ...
async handler. And after all just added await Promise.all(promises);
. It's not a big deal in my case because I save data to the Google Storage and it performs rather fast. But this misleading behavior will definitely lead to unintuitive results in other cases and needs to be fixed.
@ZJONSSON you can use Emittery
(https://github.com/sindresorhus/emittery) with promises and iterators support under the hood to implement events generation compatible with async handlers.
Also I'd like to recommend this article about microtasks and macrotasks to get things clear. And try to avoid mixing promises with events (if these events are not surrogates but native NodeJS events). Hope these considerations will help to jump out from the NodeJS Event Loop hell.