highland
highland copied to clipboard
How to propagate downstream errors to the source?
I'm currently using highland to transform and write rows coming from a database query. I need to release the database connection to a pool when all the rows have streamed through.
Right now, I just observe the source stream and release the connection when the stream is done. This works great.
However, if there's an error thrown somewhere downstream (either in the processing pipeline, or when writing), I want to be notified at the source, that way I can stop pulling rows and release the connection.
So, is there a mechanism to propagate downstream errors to the source?
Unfortunately, there's no good way. It's probably the biggest problem with Highland at the moment. We hope to fix it in version 3.x, but there's no automatic way to propagate errors right now.
You'll have to do it manually somehow. Maybe by passing a callback along with our source stream that gets called downstream when you're done with the source?
Sorry I don't have a better answer for you. :(
No problem. What if an event could be emitted using this.emit from within any step in the pipeline, and this event would be propagated up the pipeline? And you can attach an event handler at any step of the pipeline, and return false in the handler to stop propagation.
I'll put up an example in a few hours.
Automatic event propagation in this way is pretty complicated. It's because the pipeline can split (via fork
or observe
) and join (via merge
, flatMap
, etc) in fairly arbitrary ways. You'd essentially have to modify all of the transforms I just mentioned to give them support individually. It's why the error propagation (it's actually nil
propagation) work that I mentioned is being done on a separate major release branch. It's a big change.
I'm not convinced arbitrary event bubbling is worth it. What use case do you have in mind (beyond resource cleanup) that requires it?
For arbitrary event bubbling, I just figured it would be a bonus side effect of having error event bubbling. Instead of just doing it for errors, you could do it for any event. I was imagining that it would look similar to DOM event propagation.
For fork
or observe
, it could look like:
A <- event received
/ \
B C <- event received
\
D <- this.emit('lol')
Did not think about join operations.
I don't have any other use case to offer right now, and I'm definitely not attached to the mechanism I am proposing either. It's hard for me to know what's possible because I'm not familiar with the highland source.
But I do need to solve my problem somehow, which is: prematurely closing a source stream when an error happens somewhere downstream. It'd be cool if there was a way to do this in Highland. I'll code up a workaround for now.
I'm very interested in learning more about the work done in this area! Would the design of the nil
propagation work for my use case? What does the API look like?
Arbitrary event bubbling requires a little more infrastructure than just nil
bubbling, because EventEmitters don't really support this kind of re-emit case. And, canceling propagation brings up the question of what to do when one emitter says to stop and another doesn't.
Join is really the harder case, but I think your example with stream splits doesn't work for the resource cleanup use case. If you are forking the stream A
into B
and C
, if D
encounters an error and emits stop
event, you don't really want to propagate this event up to A
until B
also emits the stop
event. Otherwise, one fork could essentially deadlock the other.
A similar problem exists with observe
. If B
observes A
, and C
is just a regular downstream consumer of A
, then a stop
event in D
should propagate up to A
. Conversely, if B
is the downstream consumer and C
is the observer, then a stop
from C
shouldn't cause A
to release its resource (because B
is still going).
You can imagine a different event propagation logic for a different event. Doing this right would probably require some way for users to specify custom propagation logic for each of the transforms, which kind of defeats the purpose of automatic event propagation. At that point, you might as well do it yourself.
The 3.x propagation API looks like this:
- All streams have an
onDestroy
method. You can use it to bind any number of callbacks to run when data from the stream is no longer needed. Either the stream has emittednil
and has signaled that it has no more data, or someone has calleddestroy
on the stream, signaling that the stream should emit no more data. - All
consume
-based transforms (i.e., the ones under the Transforms section) will calldestroy
on its upstream source when it emitsnil
. For example, givenstream = source.take(1)
,source
'sonDestroy
callbacks will be called once it has emitted one item. -
fork
andobserve
propagateonDestroy
events the way I've described above. Furthermore, if you have two forkA
andB
, andA
has signaled that is done, they will no longer share backpressure andB
will consume from the shared source without needing to wait forA
to ask for more data (because it never will). This allows for code like this to workconst stream = _([1, 2, 3]); const fork1 = stream.fork(); const fork2 = stream.fork(); fork1.take(1).each(x => console.log('fork1', x)); fork2.each(x => console.log('fork2', x)); // => fork1 1 // => fork2 1 // => fork2 2 // => fork2 3
-
Higher-level transforms like
merge
andflatMap
will propagate theonDestroy
event in some reasonable way to be documented in the specific transform. We haven't decided exactly how yet. This is not implemented.
It's intended to be used like so:
function readFromDb(query) {
const db = acquireDbConnection();
return _(db.read(query))
.onDestroy(() => db.releaseConnection());
Note that onDestroy
isn't exactly error propagation, however, you can use it for your usecase by appending the stopOnError
transform somewhere downstream. This will also save you from having to use an observer
to detect the end of the db read.
There's a beta on NPM that you can install via highland@next
, but since onDestroy
propagation isn't implemented for higher-level transforms, I can't recommend running it in production unless you're sure that you don't (and will never accidentally) use them.
Without using the 3.0 beta, you'll have to release the db connection manually. The best way to do it depends on your application's architecture, but I can think of a few ways you might do it.
-
Pass along the database connection to the downstream consumer and have them manually release the connection when they detect an error. Use
error
orstopOnError
. -
Apply the downstream transforms at the same time as you read from the database.
function transformDb(dbQuery, transformAndWrite) { const db = acquireDbConnection(); return _(db.read(dbQuery)) .through(transformAndWrite) .stopOnError(err => {}) .done(() => db.releaseConnection()); } // Use it transformDb(dbQuery, stream => { // Do your transform and writes here. return stream.map(...) .flatMap(row => writeToDb(row)); });