highland icon indicating copy to clipboard operation
highland copied to clipboard

Errors thrown if pipeline contains non-Highland streams

Open joshdmiller opened this issue 10 years ago • 8 comments

I may be doing something incorrectly, but I'm having difficulty getting errors to propagate down a pipeline if that pipeline contains a non-Highland stream. E.g. this works as expected:

var mapStream = require( 'map-stream' );

var through = _.pipeline(
  _.map(function () {
    throw new Error( "Oh no!" );
  }),
  _.map(function (x) {
    return x;
  })
);

var _([1]).pipe(pipeline).stopOnError(function () {
  console.log( "I will print out just fine." );
}).resume();

But if I change the second map in the pipeline to a one from map-stream (or another non-Highland source), the error will be thrown instead:

var through = _.pipeline(
  _.map(function () {
    throw new Error( "Oh no!" );
  }),
  _(mapStream(function ( x, cb ) {
    cb( null, x );
  }))
);

var _([1]).pipe(pipeline).stopOnError(function () {
  console.log( "I will never be called." );
}).resume();

In my use case, the streams are dynamically added to the pipeline at runtime from various sources, some of which will not be Highland streams.

What is a better approach for getting the desired error propagation when using a dynamic set of mixed Highland and Node streams?

joshdmiller avatar Oct 20 '14 01:10 joshdmiller

you could check if the next stream is a highland stream (with _.isStream) and bypass the errors with _.errors. may be not the best idea :)

greelgorke avatar Oct 21 '14 09:10 greelgorke

@greelgorke I am not entirely certain what you mean. Can you elaborate?

I have tried a number of combinations, but have found nothing that really works. What I really need is to be able to take a dynamic set of streams of mixed origin (highland and non-highland) and and put them together in a series, catching any error that may occur along the way.

The closest I got was with a kludge that added an .on('error', function () {}) call to any non-Highland stream, and a stopOnErrors call to the Highland streams, chained together using pipe or through in a loop. The problem was that a downstream error in a non-highland stream would not stop new data getting written down the stream. E.g. with highalandA.pipe(nonhighlandB).pipe(highlandC), if an error was thrown in nonhighlandB, it would emit its own error event that I could capture, but highlandA would continue to throw data at it, usually triggering even more errors in nonhighlandB because the issue wasn't necessarily in the data particulars.

How do I get good interoperability with other stream sources, particularly when it comes to catching errors?

joshdmiller avatar Oct 22 '14 05:10 joshdmiller

so you need a back-channel? afaik there is no such feature in highland. the solution i could think of is to catch the errors and to call .pause on the previous stream

greelgorke avatar Oct 22 '14 06:10 greelgorke

Thanks for your help. With that idea, I ended up with a kludgy workaround, but it passes the unit tests:

var currentStream;

function createErrorHandler ( errStream, prevStream ) {
  return function streamErrorHandler ( err ) {
    if ( prevStream ) {
      prevStream.end();
    }

    myErrorCapturingMechanism( err );
  };
}

function addStream ( stream ) {
  var prevStream = currentStream;

  if ( ! currentStream ) {
    currentStream = stream;
  } else {
    if ( _.isStream( stream ) ) {
      currentStream.pipe( stream.stopOnError( createErrorHandler( stream, prevStream ) ) );
    } else if ( _.isFunction( stream ) ) {
      currentStream = stream( currentStream )
        .stopOnError( createErrorHandler( stream, prevStream ) );
    } else {
      stream.on( 'error', createErrorHandler( stream, prevStream ) );
      currentStream = currentStream.pipe( stream );
    }
  }
}

theStreamsArray.forEach( function forEachStream ( stream ) {
  addStream( stream );
});

// Assuming no errors, I can do something with currentStream...

This doesn't solve the issue, but it works around it ok. Thoughts?

joshdmiller avatar Oct 22 '14 17:10 joshdmiller

Any further thoughts on how to mix node streams into a pipeline? Specifically Transform streams.

NathanRSmith avatar May 15 '15 18:05 NathanRSmith

Nevermind, I figured it out. Here's an example of how to mix transform streams into a pipeline, or at least how I did it.

var es = require('event-stream');
var _ = require('highland');
var fs = require('fs');

// standard node readable
var instream = fs.createReadStream('somefile');
// standard node writable
var outstream = fs.createWriteStream('someotherfile');
var pipeline = [];

// some transformation from an existing lib...
var myExistingTransform = es.through(function(data) { this.queue(data); });

pipeline.push(_.split());
// add other transformations...
pipeline.push(_.through(myExistingTransform));
// more transformations...

// put it all together
instream.pipe(_.pipeline.apply(null, pipeline)).pipe(outstream);

Now I know that I can do the es.through in Highland, but in my case I had an existing library of transformations, many of which used event-stream. In my case I needed to build up my pipeline by checking a bunch of conditionals, so this array method worked fine.

NathanRSmith avatar May 15 '15 21:05 NathanRSmith

You could also do this without using arrays (arguably more readable).

var es = require('event-stream');
var _ = require('highland');
var fs = require('fs');

// standard node readable
var instream = fs.createReadStream('somefile');

var s = _(instream);

// some transformation from an existing lib...
var myExistingTransform = es.through(function(data) { this.queue(data); });

s = s.split();
// add other transformations...
if (shouldBatch()) {
    s = s.batch(4);
}
s = s.through(myExistingTransform);
// more transformations...

// write it out:
s.pipe(fs.createWriteStream('someotherfile'));

Edit: pipeline also currently has a performance bug (#270), so this approach might save you some future headache too.

vqvu avatar May 16 '15 01:05 vqvu

Thanks for the tip.

NathanRSmith avatar May 18 '15 13:05 NathanRSmith