highland
highland copied to clipboard
Error handling with batches
Is there a technique that I can use to be able to return errors from a batch operation?
As an example, assume that I have to write records in batches of 3, but every write of odd entries results in an error. Before I am done, I want to log errors and remove them from the stream, while successful writes continue on.
h( [1, 2, 3, 4, 5, 6, 7, 8, 9] )
.map( preProcess )
.batch( 3 )
.doto( writeRecords ) // odd records result in error
.sequence() // would prefer a partition for errors and success
.errors( writeErrors )
.each( h.log );
I cannot indicate an error using the throw
clause, since I am processing records in batch. Should I invoke writeRecords
using a map and return some error object that can be filtered for errors, or is there a better technique available now for branching or partitioning a stream?
I updated my example to show how the map
function could be used to return the error condition in a stream of results.
var a = h( [1, 2, 3, 4, 5, 6, 7, 8, 9] )
.map( triple ) // multiples by 3
.batch( 3 )
.map( process ) // odd - Error, Even - Value
.sequence();
b = a.fork().filter( success ); // false if instanceof Error
c = a.fork().reject( success );
b
.batch( 2 )
.each( log( 'success:' ) );
c
.batch( 2 )
.errors( writeErrors )
.each( log( 'error:' ) );
Yes. Your second example is how I would do it. The general technique for partitioning a stream is as you've shown: fork the original and apply filters to the forks.
Regarding errors, while highland allows you to continue from stream errors, I've always only used stream errors for unrecoverable errors. If it's possible to continue, I would just inject an error object as a stream value rather a stream error and filter it out using standard operators.
Finally, I would use flatMap(f)
rather than map(f).sequence()
. There's no functional difference, but I find flatMap
more explicit.
Perhaps there is a way to batch when I would not like to use fork()
or observe()
?
s // Some stream of errors and successes
.sequence() // assume the source is serial
.errors( handleErrors ) // would like handleErrors to receive a batch of 5 errors
.batch(10) // perhaps successful events are batched 10 at a time
.each( x => log('success', x) // want x to be array if 0..10 items
This one you'll have to implement yourself using consume. None of the operators that come with Highland will do much with stream errors. You can look at the implementation of batch for an example.
Why don't you want to use fork
or observe
?
I basically have a single stream of data that is processed in three stages. At the end of stage3, if there are results remaining, I consider the overall run a success. Each stage can return errors however that cause the event to be shuttled off for manual intervention and must be logged. These events will no longer progress along the flow. Every once in a while, the error could be catastrophic, and I want the entire flow to fail immediately.
(Source) ---> Stage 1 ------> Stage 2 ------> Stage 3 ------> Done
\--> error --/ \--> error --/ \--> error --/ ^
\--> fatal \--> fatal \--> fatal |
\ \ \ |
+--------------+-------------+-------+
(error is handled, flow continues) (fatal invokes done)
The only other feature is support for batching events at various stages. Stage 1 will operate on 10 items at a time; stage 2 operates on 100 at a time, and stage 3 at 500 events at a time. I would also like my errors to be handled at 10 at a time. Think of these as bulk operations where it will be more efficient to process the items in batch.
My hunch is that my requirement to "fail fast" on any unhandled errors emerging from my error handlers means I have to be able to add a stopOnError
handler as part of my main flow. This is why I have a reluctance to create additional streams.
This is pretty close, but I have two holes.
- done() is not called when there is a fatal exception (like when error == 51).
- I would like my errors to be batched before being handled. I'm looking into
batch
andconsume
to see if they provide me enough clues to create my ownbatchErrors
function.
describe( 'production simplified', function () {
function func( value ) {
return value.map(function (val) {
var result = val * 2 + 1;
if ( result % 3 === 0 ) return Promise.reject( result );
return Promise.resolve( result );
})
}
function log( prefix ) {
return function ( message ) {
console.log( prefix + ':' + message );
}
}
function error( prefix ) {
return function( err, push ) {
console.log( prefix, err );
// 51 is a fatal condition
if (err == 51) push( err );
}
}
it( 'should mimic production flow', function ( done ) {
_( [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] )
.batch(3)
.map( func ) // Promise: resolves to 2x+1, rejects if divisible by 3
.sequence()
.flatMap( _ ) // resolve promises
.errors( error('stage 1 error') )
.stopOnError( error('fatal') )
.batch(2)
.map( func ) // Promise: resolves to 2x+1, rejects if divisible by 3
.sequence()
.flatMap( _ ) // resolve promises
.errors( error('stage 2 error') )
.stopOnError( error('fatal') )
.toArray( function(values) {
log( 'result' )( values );
done();
} )
;
} );
} );
Actually, problem # 1 (stopOnError
not calling done()
) is actually because I am re-emitting the error again. It is resolved by handling the condition locally.
it.only( 'should mimic production flow', function ( done ) {
_( [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12] )
.batch( 3 )
.map( func ) // Promise: resolves to 2x+1, rejects if divisible by 3
.sequence()
.flatMap( _ ) // resolve promises
.errors( error( 'stage 1 error' ) )
.stopOnError( function ( err ) {
console.error( 'fatal', err );
} )
.batch( 2 )
.map( func ) // Promise: resolves to 2x+1, rejects if divisible by 3
.sequence()
.flatMap( _ ) // resolve promises
.errors( error( 'stage 2 error' ) )
.stopOnError( function ( err ) {
console.error( 'fatal', err );
} )
.toArray( function ( values ) {
log( 'result' )( values );
done();
} )
;
} );
@vqvu It seems like highland lacks a notion of "success".
Can you take a look at the following example. I just don't want to create a similar issue:
var express = require("express");
var _ = require("highland");
var app = express();
function logError(err) {
//...
}
var handleErrors = Stream.curry(function (res, stream) {
stream.errors(function (err, push) {
logError(err);
push(err);
}).stopOnError(function () {
res.sendStatus(500);
});
});
app.post("/sync", function (req, res) {
_([ job1, job2, job3 ])
.flatten()
.through(handleErrors(res))
// what to use to execute success handling code?
// .toArray will still trigger a function because partially jobs may succeed
// .each because of the same reason as toArray
// .done does not reflect status at all
});
How one can detect success of a batch?
You can use the new toCallback to detect this case. This operator expects at most one value or error to reach it, so if your jobs emit values that you care about, put a .collect()
in front (like in the doc's example). Otherwise, filter them all out with a .filter(x => false)
.
Note that you won't need to use errors
or stopOnError
anymore. toCallback
implicitly stops on errors.
Thanks!