highland
highland copied to clipboard
End stream within map()
How can I force a stream to end from within a map() function?
Scenario:
- Processing a very large stream of data
- Calling a promise via
.map($(promise)).merge()
- The promise called above is to an external API that has a hard QUERY LIMIT.
- When the query limit is detected (via value on the stream === QUERY_LIMIT_MESSAGE), I would like to terminate all processing.
Implement a custom consume
handler that checks for QUERY_LIMIT_MESSAGE
and pushes nil
.
stream.map($(promise))
.merge()
.consume((err, x, push, next) => {
if (x === _.nil || x === QUERY_LIMIT_MESSAGE) {
push(null, _.nil);
} else{
push(err, x);
next();
}
})
Would this propagate the .end()
back to the source stream?