highland
highland copied to clipboard
Errors thrown if pipeline contains non-Highland streams
I may be doing something incorrectly, but I'm having difficulty getting errors to propagate down a pipeline if that pipeline contains a non-Highland stream. E.g. this works as expected:
var mapStream = require( 'map-stream' );
var through = _.pipeline(
_.map(function () {
throw new Error( "Oh no!" );
}),
_.map(function (x) {
return x;
})
);
var _([1]).pipe(pipeline).stopOnError(function () {
console.log( "I will print out just fine." );
}).resume();
But if I change the second map in the pipeline to a one from map-stream (or another non-Highland source), the error will be thrown instead:
var through = _.pipeline(
_.map(function () {
throw new Error( "Oh no!" );
}),
_(mapStream(function ( x, cb ) {
cb( null, x );
}))
);
var _([1]).pipe(pipeline).stopOnError(function () {
console.log( "I will never be called." );
}).resume();
In my use case, the streams are dynamically added to the pipeline at runtime from various sources, some of which will not be Highland streams.
What is a better approach for getting the desired error propagation when using a dynamic set of mixed Highland and Node streams?
you could check if the next stream is a highland stream (with _.isStream) and bypass the errors with _.errors. may be not the best idea :)
@greelgorke I am not entirely certain what you mean. Can you elaborate?
I have tried a number of combinations, but have found nothing that really works. What I really need is to be able to take a dynamic set of streams of mixed origin (highland and non-highland) and and put them together in a series, catching any error that may occur along the way.
The closest I got was with a kludge that added an .on('error', function () {})
call to any non-Highland stream, and a stopOnErrors
call to the Highland streams, chained together using pipe
or through
in a loop. The problem was that a downstream error in a non-highland stream would not stop new data getting written down the stream. E.g. with highalandA.pipe(nonhighlandB).pipe(highlandC)
, if an error was thrown in nonhighlandB
, it would emit its own error event that I could capture, but highlandA
would continue to throw data at it, usually triggering even more errors in nonhighlandB
because the issue wasn't necessarily in the data particulars.
How do I get good interoperability with other stream sources, particularly when it comes to catching errors?
so you need a back-channel? afaik there is no such feature in highland. the solution i could think of is to catch the errors and to call .pause on the previous stream
Thanks for your help. With that idea, I ended up with a kludgy workaround, but it passes the unit tests:
var currentStream;
function createErrorHandler ( errStream, prevStream ) {
return function streamErrorHandler ( err ) {
if ( prevStream ) {
prevStream.end();
}
myErrorCapturingMechanism( err );
};
}
function addStream ( stream ) {
var prevStream = currentStream;
if ( ! currentStream ) {
currentStream = stream;
} else {
if ( _.isStream( stream ) ) {
currentStream.pipe( stream.stopOnError( createErrorHandler( stream, prevStream ) ) );
} else if ( _.isFunction( stream ) ) {
currentStream = stream( currentStream )
.stopOnError( createErrorHandler( stream, prevStream ) );
} else {
stream.on( 'error', createErrorHandler( stream, prevStream ) );
currentStream = currentStream.pipe( stream );
}
}
}
theStreamsArray.forEach( function forEachStream ( stream ) {
addStream( stream );
});
// Assuming no errors, I can do something with currentStream...
This doesn't solve the issue, but it works around it ok. Thoughts?
Any further thoughts on how to mix node streams into a pipeline? Specifically Transform streams.
Nevermind, I figured it out. Here's an example of how to mix transform streams into a pipeline, or at least how I did it.
var es = require('event-stream');
var _ = require('highland');
var fs = require('fs');
// standard node readable
var instream = fs.createReadStream('somefile');
// standard node writable
var outstream = fs.createWriteStream('someotherfile');
var pipeline = [];
// some transformation from an existing lib...
var myExistingTransform = es.through(function(data) { this.queue(data); });
pipeline.push(_.split());
// add other transformations...
pipeline.push(_.through(myExistingTransform));
// more transformations...
// put it all together
instream.pipe(_.pipeline.apply(null, pipeline)).pipe(outstream);
Now I know that I can do the es.through
in Highland, but in my case I had an existing library of transformations, many of which used event-stream
. In my case I needed to build up my pipeline by checking a bunch of conditionals, so this array method worked fine.
You could also do this without using arrays (arguably more readable).
var es = require('event-stream');
var _ = require('highland');
var fs = require('fs');
// standard node readable
var instream = fs.createReadStream('somefile');
var s = _(instream);
// some transformation from an existing lib...
var myExistingTransform = es.through(function(data) { this.queue(data); });
s = s.split();
// add other transformations...
if (shouldBatch()) {
s = s.batch(4);
}
s = s.through(myExistingTransform);
// more transformations...
// write it out:
s.pipe(fs.createWriteStream('someotherfile'));
Edit: pipeline
also currently has a performance bug (#270), so this approach might save you some future headache too.
Thanks for the tip.