node-unzipper
node-unzipper copied to clipboard
Unzipping files in lambda using unzipper.Open.s3 does not work
Hi, I am trying to use unzipper library inside lambda to extract file from S3 using below code. But, lamda function call succeeds with successful response. But, the file does not get extracted anywhere and function execution halts there.
function extractZipToTempLocation(s3Client, extractionPath, bucketName, key){ return unzipper.Open.s3(s3Client, {Bucket: bucketName, Key: key}) .then(d => { return d.extract({path: extractionPath}); }); }
Can someone provide inputs on this? This works fine locally.
Can you provide more details on the parameters with which you are calling this function?
Lambda has certain restrictions on where you are allowed to write files. Given the ephemeral nature of Lambda, I'm curious as to where you are trying to extract files to?
/tmp
should always be available: https://forums.aws.amazon.com/thread.jspa?threadID=174119
I wrote an example of streaming a zipped file from s3 to s3. I wrote a little tutorial on it. it might help https://medium.com/@multiaki/unzipping-s3-files-back-to-s3-without-uncompressing-entire-file-streaming-82f662b5065a
Here's my example code of unzipping a zipped file back to S3:
const s3Client = new AWS.S3();
const directory = await unzipper.Open.s3(s3Client,{Bucket: 'joshtest1', Key: 'Archive.zip'});
for await (const file of directory.files) {
await s3Client.putObject({
Bucket: 'joshtest1',
Key: `test/${file.path}`,
Body: await file.stream(),
ContentLength: file.uncompressedSize
}).promise();
}
Hi, I have 5000 svg files in the zip file e.g., test.zip
const directory = await unzipper.Open.s3(s3, params);
for (const file of directory.files) {
console.log("File==" + file.path);
var result = await s3.putObject({
Bucket: bucketName,
Key: uploadFolder/${file.path}
,
Body: await file.stream(),
ContentLength: file.uncompressedSize,
ACL: 'public-read'
}).promise();
console.log("upload info==" + JSON.stringify(result));
}
files.length is 5000. However only first 30 files to unzip and upload to target bucket. What can we do
Running on Lambda
+1 I'm having the same issue. Lambda seems to exit early.
Is it possible you are hitting a timeout on the lambda?
Also you are doing one file at a time in your example (await inside for loop), but can easily do multiple files simultaneously. Here is an example (using Bluebird.map
with concurrency option):
const Bluebird = require('bluebird');
const directory = await unzipper.Open.s3(s3, params);
await Promise.map(directory.files, async file => {
var result = await s3.putObject({
Bucket: bucketName,
Key: uploadFolder/${file.path},
Body: file.stream(),
ACL: 'public-read'
}).promise();
console.log("upload info==" + JSON.stringify(result));
}, { concurrency: 25});
@fyarepo @jbravo-edrans please let me know if the above example helped resolve the issue?
Hi ZJONSSON, thanks for your support on this.
I already tried that, using Promise.map, Promise.all, Promise.each, and no luck. The issue seems to be related to "Open.s3" and AWS S3 Streams. I don't know what's happening there. The issue happened to me using a zip file which has around 100 files inside (zip is ~40MB). With smaller zip files it worked well.
BTW, I found a solution avoiding the use of Open.S3, just using the unzipper.Parse method like this:
const s3Client = new AWS.S3();
const s3Stream = s3Client.getObject(
{
Bucket: ZIP_BUCKET_NAME,
Key: key,
},
).createReadStream();
const files = s3Stream
.pipe(unzipper.Parse({forceStream: true}));
for await (const entry of files) {
const fileName = entry.path;
const type = entry.type;
const size = entry.vars.uncompressedSize;
debug(`Type: ${type}`);
if (type === 'File') {
debug(`Uploading ${fileName}...`);
const sDir = key.substring(8);
const params = {
Bucket: ZIP_BUCKET_NAME,
Key: `extracted/${sDir}/${fileName}`,
Body: entry.on('finish', () => {
debug(`Stream 1 finish`);
}),
ContentLength: size,
};
await s3Client.upload(params).promise();
} else {
entry.autodrain();
}
It was the only way I could make the unzipping work fine.
I'm having a similar problem, and it's not just on Lambda. When I unzip a large file (~5000 files, 200MB zipped), it just exits with exit code 0 after a minute or so, with only about 20 or 30 files unzipped. This is the code I used:
return unzipper.Open.s3(s3, {Bucket: zipBucket, Key: zipKey}).then(directory =>
Promise.map(
directory.files,
file => {
const targetKey = [targetFolder, file.path].join('/');
return s3.upload({Bucket: targetBucket, Key: targetKey, Body: file.stream()}).promise();
},
{ concurrency: 40 }
)
);
It's the same when I use mapSeries
or a different concurrency. When I slice directory.files
to only run on about 100 files, it works OK, so it might be a resource issue.
When I tried to apply @jbravo-edrans 's approach (see code below), it exits after a few seconds on the last line here:
const s3Stream = s3.getObject({
Bucket: zipBucket,
Key: zipKey,
}).createReadStream();
const files = await s3Stream.pipe(unzipper.Parse({forceStream: true})).promise();
Something interesting is that it exits without process.exit()
being invoked - I replaced process.exit()
with my own function so I can debug or log information about it, and Node dies without getting there.
Update: Found a hack that gets around the process exiting in the middle. It makes Node think it's not done with all events using the method below. This means that the bug is probably that somehow the event loop queue gets cleared out in the unzipping process.
var done = (
function wait () {
if (!done) setTimeout(wait, 5000);
}
)();
doMyStuff().then(() => {
done = true;
}
I'm also running into this issue. I've got pretty much the same code as @ZJONSSON, except I'm using p-map instead of Bluebird. After sucessfully unzipping and uploading about 40 files, the other files do not complete, and as a result the lambda times out.
I wondered if the files that it was hanging on were very large, but I unzipped it locally (took a couple seconds) and these files were reasonable (comparable to the others in the zip archive).
We're having similar issues, pretty much what @jbravo-edrans and @ranhalprin describe.
I had some luck with the following approach (+ increasing lambda function timeout value):
import AWS from 'aws-sdk';
import unzipper from 'unzipper';
import stream from 'stream';
const S3 = new AWS.S3();
const uploadStream = ({ Bucket, Key }) => {
const pass = new stream.PassThrough();
return {
writeStream: pass,
promise: S3.upload({ Bucket, Key, Body: pass }).promise(),
};
}
export const handleUpload = async (event: S3Event) => {
const batchSize = 30; // << tweak this based on your needs
const record = event.Records[0];
const zip = S3.getObject({
Bucket: process.env.S3_BUCKET,
Key: record.s3.object.key
}).createReadStream().pipe(unzipper.Parse({forceStream: true}));
let promises = [];
for await (const entry of zip) {
const fileName = entry.path;
if (entry.type.match(/file/ig) /* include more filters here if needed */) {
const { writeStream, promise } = uploadStream({Bucket: process.env.S3_BUCKET, Key: `parsed/${fileName}`});
entry.pipe(writeStream);
promises.push(promise);
} else {
entry.autodrain();
}
if (promises.length === batchSize) {
await Promise.all(promises);
promises = [];
}
}
if (promises.length === 0) {
return;
}
await Promise.all(promises);
};
I think, the issue is well explained here, lambda limitation on disk size; @jbravo-edrans code worked for me, thanks.
I was having this issue before as well. For me it was not because of the memory size or time limits in Lambda, though those do need to be raised from the default. I couldn't identify where in the current unzipper code the failure was occurring, so I rewrote the decrypt/inflate upload stream and it worked without silently failing after around 40 entries. Here's a simplified version of my implementation, ready to be tested locally or in a Lambda.
initDecrypt
and checkCRC
are mostly refactored implementations of what's in lib/unzip.js
.
entryByteRange
attempts to replicate what's done in lib/unzip.js
where PullStream
is used to skip the entry's local headers and find the first byte of file data. This has worked with good reliability for me and lifts the ~40 entry limit. However I haven't been able yet to calculate the right byte range for zips using zip64. I'm not sure if I need to switch back to using PullStream
or if it's possible to calculate the start and end of a zip64 entry's file data using just the headers from the central directory. I'd appreciate any feedback on my method in that function in particular.
const AWS = require('aws-sdk');
const unzipper = require('unzipper');
const Decrypt = require('unzipper/lib/Decrypt');
const zlib = require('zlib');
const Stream = require('stream');
// Fill in config vals for local tests. Remove config when running in Lambda.
const awsConfig = new AWS.Config({
accessKeyId: '',
secretAccessKey: '',
region: '',
apiVersion: '2006-03-01',
});
let s3;
let evt;
const handler = async (event) => {
evt = event;
s3 = new AWS.S3(awsConfig);
const directory = await unzipper.Open.s3(s3, { Bucket: event.bucket, Key: event.zipKey });
const files = directory.files.filter((entry) => entry.type === 'File');
const uploads = files.map((entry) => uploadEntry(entry));
await Promise.all(uploads);
s3 = null;
evt = null;
};
const uploadEntry = async (entry) => {
const decrypt = await initDecrypt(entry);
const inflater = entry.compressionMethod ? zlib.createInflateRaw() : Stream.PassThrough();
const params = { Bucket: evt.bucket, Key: evt.zipKey, Range: entryByteRange(entry) };
const Body = s3.getObject(params).createReadStream()
.on('error', (e) => { throw e; })
.pipe(decrypt)
.on('error', (e) => { throw e; })
.pipe(inflater)
.on('error', (e) => { throw e; });
return s3.upload({ Bucket: evt.bucket, Key: evt.unzipFolderKey + entry.path, Body }).promise();
};
const entryByteRange = (entry) => {
const {
path, flags, offsetToLocalFileHeader: offset, compressedSize, extraFieldLength,
} = entry;
const headerLength = 30 + path.length + (flags & 8 ? extraFieldLength : 0);
const start = offset + headerLength + (encrypted(entry) ? 12 : 0);
const end = offset + headerLength + compressedSize - 1;
return `bytes=${start}-${end}`;
};
const initDecrypt = async (entry) => {
if (!encrypted(entry)) return Stream.PassThrough();
if (!evt.password) throw new Error('missing_password');
const decrypt = Decrypt();
evt.password.split('').forEach((d) => decrypt.update(d));
await checkCRC(decrypt, entry);
return decrypt.stream();
};
const checkCRC = async (decrypt, entry) => {
const params = { Bucket: evt.bucket, Key: evt.zipKey, Range: crcByteRange(entry) };
const { Body } = await s3.getObject(params).promise();
const data = Body.map((byte) => decrypt.decryptByte(byte));
const check = (entry.flags & 0x8)
? (entry.lastModifiedTime >> 8) & 0xff
: (entry.crc32 >> 24) & 0xff;
if (data[11] !== check) throw new Error('bad_password');
};
const crcByteRange = (entry) => {
const start = entry.offsetToLocalFileHeader + 30 + entry.path.length;
return `bytes=${start}-${start + 11}`;
};
const encrypted = (entry) => entry.flags & 0x01;
exports.handler = handler;
// Fill in demoEvt vals
const demoEvt = {
bucket: '',
zipKey: '',
unzipFolderKey: '',
password: '',
};
handler(demoEvt);
I'd like to help fix this issue in the unzipper module, but I don't yet understand what's breaking in unzipper.Open that isn't breaking in my code. Once we know what's happening I can work out a solution and open a PR for it.
Hi ZJONSSON, thanks for your support on this.
I already tried that, using Promise.map, Promise.all, Promise.each, and no luck. The issue seems to be related to "Open.s3" and AWS S3 Streams. I don't know what's happening there. The issue happened to me using a zip file which has around 100 files inside (zip is ~40MB). With smaller zip files it worked well.
BTW, I found a solution avoiding the use of Open.S3, just using the unzipper.Parse method like this:
const s3Client = new AWS.S3(); const s3Stream = s3Client.getObject( { Bucket: ZIP_BUCKET_NAME, Key: key, }, ).createReadStream(); const files = s3Stream .pipe(unzipper.Parse({forceStream: true})); for await (const entry of files) { const fileName = entry.path; const type = entry.type; const size = entry.vars.uncompressedSize; debug(`Type: ${type}`); if (type === 'File') { debug(`Uploading ${fileName}...`); const sDir = key.substring(8); const params = { Bucket: ZIP_BUCKET_NAME, Key: `extracted/${sDir}/${fileName}`, Body: entry.on('finish', () => { debug(`Stream 1 finish`); }), ContentLength: size, }; await s3Client.upload(params).promise(); } else { entry.autodrain(); }
It was the only way I could make the unzipping work fine.
I was able to extract/unzip 5GB file in 12 minutes using Lambda using this approach. approx 5k files were in it. Thanks.
I found I was running out of open sockets (50 is the default max) because the read streams used to get the ranges were not being closed, though I am using Open.custom with AWS SDK v3.
Also if I lift the maximum I seem to hit an S3-imposed limit of 200 open read streams (after which I cannot even make head object requests to the zip file), though I cannot find where this is documented.
// stream isn't async so make a passthru stream and return that
Open.custom({
stream: (offset, length) => {
const passThroughStream = new PassThrough();
const range = `bytes=${offset}-${length || ''}`;
// This needs to be in scope when passThroughStream.finish event is called
let s3RangeStream: Readable;
void this.s3Client
.send(new GetObjectCommand({ Bucket: bucket, Key: key, Range: range }))
.then(({ Body }) => {
if (Body === undefined) {
throw Error('getZipContents: could not get read stream');
}
s3RangeStream = Body as Readable;
s3RangeStream.pipe(passThroughStream);
});
passThroughStream.on('finish', () => s3RangeStream.destroy()); // <---
return passThroughStream as Readable;
},
I made this (naive) modification to unzip.js
stream
.pipe(inflater)
.on('error',function(err) { entry.emit('error',err);})
.pipe(entry)
.on('finish', function() {
if (req.end) // <-- added this here so that passThroughStream finish event it triggered
req.end();
else if (req.abort)
req.abort();
else if (req.close)
req.close();
else if (req.push)
req.push(); // <-- this is called otherwise, though I do not seem to be able to listen for it
else
console.log('warning - unable to close stream');
});
I was having this issue before as well. For me it was not because of the memory size or time limits in Lambda, though those do need to be raised from the default. I couldn't identify where in the current unzipper code the failure was occurring, so I rewrote the decrypt/inflate upload stream and it worked without silently failing after around 40 entries. Here's a simplified version of my implementation, ready to be tested locally or in a Lambda.
initDecrypt
andcheckCRC
are mostly refactored implementations of what's inlib/unzip.js
.
entryByteRange
attempts to replicate what's done inlib/unzip.js
wherePullStream
is used to skip the entry's local headers and find the first byte of file data. This has worked with good reliability for me and lifts the ~40 entry limit. However I haven't been able yet to calculate the right byte range for zips using zip64. I'm not sure if I need to switch back to usingPullStream
or if it's possible to calculate the start and end of a zip64 entry's file data using just the headers from the central directory. I'd appreciate any feedback on my method in that function in particular.const AWS = require('aws-sdk'); const unzipper = require('unzipper'); const Decrypt = require('unzipper/lib/Decrypt'); const zlib = require('zlib'); const Stream = require('stream'); // Fill in config vals for local tests. Remove config when running in Lambda. const awsConfig = new AWS.Config({ accessKeyId: '', secretAccessKey: '', region: '', apiVersion: '2006-03-01', }); let s3; let evt; const handler = async (event) => { evt = event; s3 = new AWS.S3(awsConfig); const directory = await unzipper.Open.s3(s3, { Bucket: event.bucket, Key: event.zipKey }); const files = directory.files.filter((entry) => entry.type === 'File'); const uploads = files.map((entry) => uploadEntry(entry)); await Promise.all(uploads); s3 = null; evt = null; }; const uploadEntry = async (entry) => { const decrypt = await initDecrypt(entry); const inflater = entry.compressionMethod ? zlib.createInflateRaw() : Stream.PassThrough(); const params = { Bucket: evt.bucket, Key: evt.zipKey, Range: entryByteRange(entry) }; const Body = s3.getObject(params).createReadStream() .on('error', (e) => { throw e; }) .pipe(decrypt) .on('error', (e) => { throw e; }) .pipe(inflater) .on('error', (e) => { throw e; }); return s3.upload({ Bucket: evt.bucket, Key: evt.unzipFolderKey + entry.path, Body }).promise(); }; const entryByteRange = (entry) => { const { path, flags, offsetToLocalFileHeader: offset, compressedSize, extraFieldLength, } = entry; const headerLength = 30 + path.length + (flags & 8 ? extraFieldLength : 0); const start = offset + headerLength + (encrypted(entry) ? 12 : 0); const end = offset + headerLength + compressedSize - 1; return `bytes=${start}-${end}`; }; const initDecrypt = async (entry) => { if (!encrypted(entry)) return Stream.PassThrough(); if (!evt.password) throw new Error('missing_password'); const decrypt = Decrypt(); evt.password.split('').forEach((d) => decrypt.update(d)); await checkCRC(decrypt, entry); return decrypt.stream(); }; const checkCRC = async (decrypt, entry) => { const params = { Bucket: evt.bucket, Key: evt.zipKey, Range: crcByteRange(entry) }; const { Body } = await s3.getObject(params).promise(); const data = Body.map((byte) => decrypt.decryptByte(byte)); const check = (entry.flags & 0x8) ? (entry.lastModifiedTime >> 8) & 0xff : (entry.crc32 >> 24) & 0xff; if (data[11] !== check) throw new Error('bad_password'); }; const crcByteRange = (entry) => { const start = entry.offsetToLocalFileHeader + 30 + entry.path.length; return `bytes=${start}-${start + 11}`; }; const encrypted = (entry) => entry.flags & 0x01; exports.handler = handler; // Fill in demoEvt vals const demoEvt = { bucket: '', zipKey: '', unzipFolderKey: '', password: '', }; handler(demoEvt);
I'd like to help fix this issue in the unzipper module, but I don't yet understand what's breaking in unzipper.Open that isn't breaking in my code. Once we know what's happening I can work out a solution and open a PR for it.
TYSM!, Saved me.