highland icon indicating copy to clipboard operation
highland copied to clipboard

Error handling with batches

Open oravecz opened this issue 9 years ago • 9 comments

Is there a technique that I can use to be able to return errors from a batch operation?

As an example, assume that I have to write records in batches of 3, but every write of odd entries results in an error. Before I am done, I want to log errors and remove them from the stream, while successful writes continue on.

h( [1, 2, 3, 4, 5, 6, 7, 8, 9] )
            .map( preProcess )
            .batch( 3 )
            .doto( writeRecords )     // odd records result in error
            .sequence()              // would prefer a partition for errors and success
            .errors( writeErrors )
            .each( h.log );

I cannot indicate an error using the throw clause, since I am processing records in batch. Should I invoke writeRecords using a map and return some error object that can be filtered for errors, or is there a better technique available now for branching or partitioning a stream?

oravecz avatar Feb 05 '16 14:02 oravecz

I updated my example to show how the map function could be used to return the error condition in a stream of results.

var a = h( [1, 2, 3, 4, 5, 6, 7, 8, 9] )
    .map( triple )              // multiples by 3
    .batch( 3 )
    .map( process )             // odd - Error, Even - Value
    .sequence();

b = a.fork().filter( success ); // false if instanceof Error
c = a.fork().reject( success );

b
    .batch( 2 )
    .each( log( 'success:' ) );
c
    .batch( 2 )
    .errors( writeErrors )
    .each( log( 'error:' ) );

oravecz avatar Feb 05 '16 15:02 oravecz

Yes. Your second example is how I would do it. The general technique for partitioning a stream is as you've shown: fork the original and apply filters to the forks.

Regarding errors, while highland allows you to continue from stream errors, I've always only used stream errors for unrecoverable errors. If it's possible to continue, I would just inject an error object as a stream value rather a stream error and filter it out using standard operators.

Finally, I would use flatMap(f) rather than map(f).sequence(). There's no functional difference, but I find flatMap more explicit.

vqvu avatar Feb 05 '16 21:02 vqvu

Perhaps there is a way to batch when I would not like to use fork() or observe()?

s                                  // Some stream of errors and successes
    .sequence()                    // assume the source is serial
    .errors( handleErrors )        // would like handleErrors to receive a batch of 5 errors
    .batch(10)                     // perhaps successful events are batched 10 at a time
    .each( x => log('success', x)  // want x to be array if 0..10 items

oravecz avatar Feb 14 '16 13:02 oravecz

This one you'll have to implement yourself using consume. None of the operators that come with Highland will do much with stream errors. You can look at the implementation of batch for an example.

Why don't you want to use fork or observe?

vqvu avatar Feb 14 '16 23:02 vqvu

I basically have a single stream of data that is processed in three stages. At the end of stage3, if there are results remaining, I consider the overall run a success. Each stage can return errors however that cause the event to be shuttled off for manual intervention and must be logged. These events will no longer progress along the flow. Every once in a while, the error could be catastrophic, and I want the entire flow to fail immediately.

(Source) ---> Stage 1 ------> Stage 2 ------> Stage 3 ------> Done
               \--> error --/  \--> error --/  \--> error --/   ^
                \--> fatal     \--> fatal     \--> fatal        |
                          \              \             \        |
                           +--------------+-------------+-------+
(error is handled, flow continues)  (fatal invokes done)

The only other feature is support for batching events at various stages. Stage 1 will operate on 10 items at a time; stage 2 operates on 100 at a time, and stage 3 at 500 events at a time. I would also like my errors to be handled at 10 at a time. Think of these as bulk operations where it will be more efficient to process the items in batch.

My hunch is that my requirement to "fail fast" on any unhandled errors emerging from my error handlers means I have to be able to add a stopOnError handler as part of my main flow. This is why I have a reluctance to create additional streams.

This is pretty close, but I have two holes.

  1. done() is not called when there is a fatal exception (like when error == 51).
  2. I would like my errors to be batched before being handled. I'm looking into batch and consume to see if they provide me enough clues to create my own batchErrors function.
describe( 'production simplified', function () {

    function func( value ) {
        return value.map(function (val) {
            var result = val * 2 + 1;
            if ( result % 3 === 0 ) return Promise.reject( result );
            return Promise.resolve( result );
        })
    }

    function log( prefix ) {
        return function ( message ) {
            console.log( prefix + ':' + message );
        }
    }

    function error( prefix ) {
        return function( err, push ) {
            console.log( prefix, err );
            // 51 is a fatal condition
            if (err == 51) push( err );
        }
    }

    it( 'should mimic production flow', function ( done ) {
        _( [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] )

            .batch(3)
            .map( func )      // Promise: resolves to 2x+1, rejects if divisible by 3
            .sequence()
            .flatMap( _ )       // resolve promises
            .errors( error('stage 1 error') )
            .stopOnError( error('fatal') )

            .batch(2)
            .map( func )      // Promise: resolves to 2x+1, rejects if divisible by 3
            .sequence()
            .flatMap( _ )       // resolve promises
            .errors( error('stage 2 error') )
            .stopOnError( error('fatal') )

            .toArray( function(values) {
                log( 'result' )( values );
                done();
            } )
            ;
    } );
} );

oravecz avatar Feb 15 '16 15:02 oravecz

Actually, problem # 1 (stopOnError not calling done()) is actually because I am re-emitting the error again. It is resolved by handling the condition locally.

it.only( 'should mimic production flow', function ( done ) {
    _( [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] )

        .batch( 3 )
        .map( func )      // Promise: resolves to 2x+1, rejects if divisible by 3
        .sequence()
        .flatMap( _ )       // resolve promises
        .errors( error( 'stage 1 error' ) )
        .stopOnError( function ( err ) {
            console.error( 'fatal', err );
        } )

        .batch( 2 )
        .map( func )      // Promise: resolves to 2x+1, rejects if divisible by 3
        .sequence()
        .flatMap( _ )       // resolve promises
        .errors( error( 'stage 2 error' ) )
        .stopOnError( function ( err ) {
            console.error( 'fatal', err );
        } )

        .toArray( function ( values ) {
            log( 'result' )( values );
            done();
        } )
    ;
} );

oravecz avatar Feb 15 '16 15:02 oravecz

@vqvu It seems like highland lacks a notion of "success".
Can you take a look at the following example. I just don't want to create a similar issue:

var express = require("express");
var _ = require("highland");
var app = express();

function logError(err) {
    //...
}

var handleErrors = Stream.curry(function (res, stream) {
    stream.errors(function (err, push) {
        logError(err);
        push(err);
    }).stopOnError(function () {
        res.sendStatus(500);
    });
});

app.post("/sync", function (req, res) {
    _([ job1, job2, job3 ])
        .flatten()
        .through(handleErrors(res))
        // what to use to execute success handling code?
        // .toArray will still trigger a function because partially jobs may succeed
        // .each because of the same reason as toArray
        // .done does not reflect status at all
});

How one can detect success of a batch?

pavel avatar Aug 25 '16 12:08 pavel

You can use the new toCallback to detect this case. This operator expects at most one value or error to reach it, so if your jobs emit values that you care about, put a .collect() in front (like in the doc's example). Otherwise, filter them all out with a .filter(x => false).

Note that you won't need to use errors or stopOnError anymore. toCallback implicitly stops on errors.

vqvu avatar Aug 25 '16 18:08 vqvu

Thanks!

pavel avatar Aug 26 '16 14:08 pavel