gulp
gulp copied to clipboard
writing-a-plugin "Call the callback function only when the current file (stream/buffer) is completely consumed"
What were you expecting to happen?
Following the verbal guidance here -
"Call the callback function only when the current file (stream/buffer) is completely consumed"
then in case of file.IsStream()
I expected to need to wait for the 'end' event from the inner transform before calling the callback()
.
What actually happened?
As shown in the code example on that man page, it only works if callback()
is called immediately.
Please post a sample of your gulpfile (preferably reduced to just the bit that's not working)
innerTransform.on('end', () => {
cb(null, file) // gulp writes an empty file doing it this way
})
Actually implementing the above code (instead of calling the callback
immediately), resulted in an empty file being written, although the inner transform had dutifully pushed all the data.
What version of gulp are you using? gulp 4
What versions of npm and node are you using? npm 6.9.0 gulp 4.0.2
I am unable to convince myself that
when the current file (stream/buffer) is completely consumed
could mean anything other than the end
event has been emitted. Furthermore, the wording seems perfect from a design point of view, and fits with all the usage of callback
s that I know.
How to reconcile the words and the actual design?
Can you post the full plugin code?
@craigphicks I agree with @contra - there might be some weirdness here that we can only determine by seeing all the code.
Hi @contra @phated -
This is part of a repo for a javascript pre-processor reversible-preproc. Part of that project is a Node Transform Stream, with _transform
and _flush
defined to suit the function. It is a completely standard node transform stream.
Note: the currently published npm version (1,x,x) of reversible-preproc is a much older version which I plan to update shortly
The following code is part of a dependent repo gulp-reversible-preproc with the aim of providing a gulp transform adapter. The argument InnerTransform
passed to gulpCreateOuterTransform
is a standard node transform stream, e.g., as it is called here.
It's written as a general gulp interface in order to provide a clean interface within the gulp-reversible-preprocess
project.
import through from 'through2'
import PluginError from 'plugin-error'
function makePluginError(PLUGIN_NAME, err) {
if (err instanceof PluginError) {
return err
} else if (err instanceof Error) {
let pe = new PluginError(PLUGIN_NAME, err.message)
pe.stack = err.stack
return pe
} else {
return new PluginError(PLUGIN_NAME, err.toString())
}
}
// The purpose of this "makeOverrides" is only to convert InnerTransform errors
// to PluginError. Necessary because the outer on-'error' handler's errors are ignored
// in favor of another listeners errors
function makeOverrides(PLUGIN_NAME, InnerTransform) {
return {
transform(chunk, enc, cb) {
InnerTransform.prototype._transform.call(this, chunk, enc, (err, data) => {
if (err)
cb(makePluginError(PLUGIN_NAME, err), null)
else
cb(null, data)
})
},
flush(cb) {
InnerTransform.prototype._flush.call(this, (err, data) => {
if (err)
cb(makePluginError(PLUGIN_NAME, err), null)
else
cb(null, data)
})
},
}
}
function gulpCreateOuterTransform(
PLUGIN_NAME, InnerTransform,
innerArgs, transformOpts) {
// Creating a stream through which each file will pass
return through.obj(function (file, enc, cb) {
if (file.isNull()) {
// return empty file
return cb(null, file)
}
let outChunks = []
let rppt = new InnerTransform(
...innerArgs,
Object.assign((transformOpts || {}), makeOverrides(PLUGIN_NAME, InnerTransform))
)
rppt.on('end', () => {
/// https://stackoverflow.com/a/52303073
let date = new Date()
file.stat.atime = date
file.stat.mtime = date
///
if (file.isBuffer())
file.contents = Buffer.concat(outChunks)
// putting the callback here will result in Gulp writing an empty file
//cb(null, file)
})
rppt.on('error', (err) => {
// in the case where file.isStream()===true,
// there is another listener on error which does not
// convert the err to PluginErr and passed that unconverted
// err to the destroy function of the next stage resulting
// in hte following PluginError not being used.
// To workaround that, GulpWrappedInnerTransform
// transforms the err after every call to _transform and _flush.
// THIS emit CODE IS THEREFORE REDUNDANT, but harmless.
this.emit('error',
makePluginError(PLUGIN_NAME, err)
)
// cb was called earlier, so can't call it here
})
if (file.isBuffer()) {
rppt.on('data', (chunk) => {
if (!(chunk instanceof Buffer))
chunk = Buffer.from(chunk)
outChunks.push(chunk)
})
rppt.end(file.contents, 'utf8') // 'end()' is equiv write() + signal end
}
if (file.isStream()) {
file.contents = file.contents
.pipe(rppt)
rppt.resume()
}
cb(null, file) // return early - seems anti intuitive but written file is empty otherwise
})
}
export default gulpCreateOuterTransform
Main Issue:
Making the callback cb(null, file)
immediately seems anti intuitive. I expected to call it from inside end
and error
handlers. But when I tried that the written file was empty. According to man page example code, calling the callback immediately is correct (see below). However, according the man page verbal explanation "Call the callback function only when the current file (stream/buffer) is completely consumed" I feel my expected method (calling from end
and error
) should be correct.
Secondary issue
As recommended is some gulp instructionals, the code is using emit
to signal the error, properly converted to a gulp plugin error. This has to done because by calling the callback early, it cannot be used to return an error later. However, at the node-stream-transform level, another listener is registered for error
, and even though it is second in the listener list, it is first to pass the error (an unconverted error) up the outer gulp transform level. The workaround is the overrides
for transform
and flush
you see near the top, used to convert the error which will be passed up by that second listener.
Caveat
The code as is works now for the case of a single file opened with src
and written with dest
. I haven't tested multiple files.
Except from gulp man page showing immedaite callback
module.exports = function() {
return through.obj(function(file, encoding, callback) {
if (file.isNull()) {
// nothing to do
return callback(null, file);
}
if (file.isStream()) {
// file.contents is a Stream - https://nodejs.org/api/stream.html
this.emit('error', new PluginError(PLUGIN_NAME, 'Streams not supported!'));
// or, if you can handle Streams:
//file.contents = file.contents.pipe(...
//return callback(null, file);
} else if (file.isBuffer()) {
// file.contents is a Buffer - https://nodejs.org/api/buffer.html
this.emit('error', new PluginError(PLUGIN_NAME, 'Buffers not supported!'));
// or, if you can handle Buffers:
//file.contents = ...
//return callback(null, file);
}
});
};
I concur; I have observed the exact behavior described by @craigphicks . I didn't realize that this conflicted with the instructions, but my notes show that I am seeing the same issue:
.on('end', function () {
// DON'T CALL THIS HERE. It MAY work, if the job is small enough. But it needs to
// be called after the stream is SET UP, not when the streaming is DONE.
// Calling the callback here instead of below may result in data hanging in the
// stream--not sure of the technical term, but dest() creates no file, or the file
// is blank
// cb(returnErr, file);
})
Per @craigphicks caveat:
The code as is works now for the case of a single file opened with src and written with dest . I haven't tested multiple files.
I too can write one file successfully, but when I write two files only one writes correctly in some circumstances. Behavior noted below is consistent given the test data I'm using, but it may well be dependent upon timing and other factors, such as the size of the data being moved, etc.
...
else if (file.isStream()) {
let newStream = through2.obj(function (this: any, file: Vinyl, encoding: string, cb: Function) {
cb(null, file)
})
let newFile = new Vinyl({path: newFileName, contents:newStream})
file.contents
.pipe(request(configObj))
.pipe(newStream)
// In this order, both files are written correctly by dest();
// this.push(file)
// this.push(newFile)
// in THIS order newFile is still written correctly, and file
// is written but is blank. If either line is commented
// out, the other line writes correctly
this.push(newFile)
this.push(file)
// after our stream is set up (not necesarily finished) we call the callback
log.debug('calling callback')
cb(returnErr);
}
...
@donpedro - Your project is very interesting. I have a question about
let newStream = through2.obj(function (this: any, file: Vinyl, encoding: string, cb: Function) {
cb(null, file)
})
When does this function (pass as argument) get called and what entity receives the data?
@craigphicks that's just an empty transformFunction; I could have just left the function out completely, as I'm not actually doing any transformation; I'm really just using newStream as a passthrough so I can use it for newFile. More here: https://www.npmjs.com/package/through2#transformfunction
Thanks for the good word! Be aware that gulp-api-adapter is very much under construction; it's being developed to work with our gulp-etl project, but hopefully it'll have some utility in the wider gulp world.
Hopefully it'll be usable and stable later this week...
I might have a smaller example for when the vinyl stream becomes empty (even though I'm not that experienced with streams, so there might be a user error in the following example).
const {dest} = require('gulp');
const source = require('vinyl-source-stream');
const size = require('gulp-size');
function sizeOfStream(cb) {
const stream = source('bogusname');
stream.write('a');
stream.end();
stream.pipe(size())
.pipe(dest('.'));
cb();
}
exports.default = sizeOfStream;
The output file of the above task is empty, expected was a content of 'a'. The solution mentioned above, calling the callback immediately (in gulp-size), without waiting for the subtask to finish, works. However, interestingly enough, it breaks the logic in the flush function (gulp-size uses through2).
@jonatanlinden You're ending the stream before you piped it anywhere - you probably want to do this:
import { pipeline } from 'stream'
// later on in your code
const stream = source('bogusname');
pipeline(
stream,
size(),
dest('.'),
cb
)
stream.write('a');
stream.end();
Have you actually tried your code, and gotten another result? When I run my take of it (wrapping it in a function with cb as a parameter), I get the same behaviour as with my example: with size()
, bogusname
is empty, without it, it is not.
(I believe stream.end()
indicate that no more data will be written to the stream, but it does not really affect the readable part of the stream. Maybe?)
The point was that gulp-size
also seems to be written with the understanding that the callback function should be called after the vinyl content's stream has been processed. Like this:
if (file.isStream()) {
...
file.contents.pipe(new StreamCounter())
.on('finish', function () {
finish(null, this.bytes);
});
}
Where the function finish
calls the callback function.
I guess it could be possible to construct an even smaller example exposing the behaviour, using gulp-size as a starting point.
I was having a very similar issue with a plugin that invokes child_process.spawn
. The gulp task is "finished" in a few ms, even though the child process hasn't started yet and nobody had invoked the callback.
import {spawn} from 'node:child_process';
import process from 'node:process';
import gulp from 'gulp';
import PluginError from 'plugin-error';
import through from 'through2';
function generateSources() {
const nodeExecutable = process.argv[0];
return through.obj((file, encoding, callback) => {
if (file.isNull()) {
callback(null, file);
return;
}
// stdin and stdout are pipes, and stderr is inherited from this process.
const child = spawn(nodeExecutable, ['dist/tools/code-gen.js'], {
stdio: ['pipe', 'pipe', 'inherit'],
});
if (file.isStream()) {
console.log('Is stream.');
// file.contents is a ReadableStream
file.contents.pipe(child.stdin);
} else if (file.isBuffer()) {
console.log('Is buffer.');
// file.contents is a Buffer
child.stdin.write(file.contents);
child.stdin.end();
}
// Now file.contents is a ReadableStream from the child process's stdout.
file.contents = child.stdout;
// The filename should end in .ts.
file.extname = '.ts';
// If the child fails, this pipe fails.
child.on('error', (error) => {
console.log('On error.', error);
callback(error, null);
});
// Wait for the child to complete.
// Note that 'exit' fires before the pipes close, so we use 'close' to make
// sure we have all the data.
child.on('close', (code) => {
console.log('On close.', code);
if (code == 0) {
// Exit code zero, this pipe succeeds.
callback(null, file);
} else {
// Exit code non-zero, this pipe fails.
const error = new PluginError(
'GenerateSources', `Failed with exit code ${code}`);
callback(error, null);
}
});
});
}
All my logs would occur after the task "finished" "successfully", even if I deliberately broke the command to trigger ENOENT
.
I finally found the cause. It was my task:
gulp.task('generate-sources', gulp.series('build-tools', async () => {
return gulp.src('src/gen/*.yaml')
.pipe(generateSources())
.pipe(gulp.dest('generated/gen/'));
}));
The problem was async
in my task definition. The fixed version works correctly:
gulp.task('generate-sources', gulp.series('build-tools', () => {
return gulp.src('src/gen/*.yaml')
.pipe(generateSources())
.pipe(gulp.dest('generated/gen/'));
}));