highland icon indicating copy to clipboard operation
highland copied to clipboard

How to propagate downstream errors to the source?

Open surjikal opened this issue 7 years ago • 5 comments

I'm currently using highland to transform and write rows coming from a database query. I need to release the database connection to a pool when all the rows have streamed through.

Right now, I just observe the source stream and release the connection when the stream is done. This works great.

However, if there's an error thrown somewhere downstream (either in the processing pipeline, or when writing), I want to be notified at the source, that way I can stop pulling rows and release the connection.

So, is there a mechanism to propagate downstream errors to the source?

surjikal avatar Mar 05 '17 02:03 surjikal

Unfortunately, there's no good way. It's probably the biggest problem with Highland at the moment. We hope to fix it in version 3.x, but there's no automatic way to propagate errors right now.

You'll have to do it manually somehow. Maybe by passing a callback along with our source stream that gets called downstream when you're done with the source?

Sorry I don't have a better answer for you. :(

vqvu avatar Mar 05 '17 07:03 vqvu

No problem. What if an event could be emitted using this.emit from within any step in the pipeline, and this event would be propagated up the pipeline? And you can attach an event handler at any step of the pipeline, and return false in the handler to stop propagation.

I'll put up an example in a few hours.

surjikal avatar Mar 05 '17 10:03 surjikal

Automatic event propagation in this way is pretty complicated. It's because the pipeline can split (via fork or observe) and join (via merge, flatMap, etc) in fairly arbitrary ways. You'd essentially have to modify all of the transforms I just mentioned to give them support individually. It's why the error propagation (it's actually nil propagation) work that I mentioned is being done on a separate major release branch. It's a big change.

I'm not convinced arbitrary event bubbling is worth it. What use case do you have in mind (beyond resource cleanup) that requires it?

vqvu avatar Mar 05 '17 11:03 vqvu

For arbitrary event bubbling, I just figured it would be a bonus side effect of having error event bubbling. Instead of just doing it for errors, you could do it for any event. I was imagining that it would look similar to DOM event propagation.

For fork or observe, it could look like:

  A <- event received
 / \
B   C <- event received
     \
       D <- this.emit('lol')

Did not think about join operations.

I don't have any other use case to offer right now, and I'm definitely not attached to the mechanism I am proposing either. It's hard for me to know what's possible because I'm not familiar with the highland source.

But I do need to solve my problem somehow, which is: prematurely closing a source stream when an error happens somewhere downstream. It'd be cool if there was a way to do this in Highland. I'll code up a workaround for now.

I'm very interested in learning more about the work done in this area! Would the design of the nil propagation work for my use case? What does the API look like?

surjikal avatar Mar 05 '17 23:03 surjikal

Arbitrary event bubbling requires a little more infrastructure than just nil bubbling, because EventEmitters don't really support this kind of re-emit case. And, canceling propagation brings up the question of what to do when one emitter says to stop and another doesn't.

Join is really the harder case, but I think your example with stream splits doesn't work for the resource cleanup use case. If you are forking the stream A into B and C, if D encounters an error and emits stop event, you don't really want to propagate this event up to A until B also emits the stop event. Otherwise, one fork could essentially deadlock the other.

A similar problem exists with observe. If B observes A, and C is just a regular downstream consumer of A, then a stop event in D should propagate up to A. Conversely, if B is the downstream consumer and C is the observer, then a stop from C shouldn't cause A to release its resource (because B is still going).

You can imagine a different event propagation logic for a different event. Doing this right would probably require some way for users to specify custom propagation logic for each of the transforms, which kind of defeats the purpose of automatic event propagation. At that point, you might as well do it yourself.

The 3.x propagation API looks like this:

  • All streams have an onDestroy method. You can use it to bind any number of callbacks to run when data from the stream is no longer needed. Either the stream has emitted nil and has signaled that it has no more data, or someone has called destroy on the stream, signaling that the stream should emit no more data.
  • All consume-based transforms (i.e., the ones under the Transforms section) will call destroy on its upstream source when it emits nil. For example, given stream = source.take(1), source's onDestroy callbacks will be called once it has emitted one item.
  • fork and observe propagate onDestroy events the way I've described above. Furthermore, if you have two fork A and B, and A has signaled that is done, they will no longer share backpressure and B will consume from the shared source without needing to wait for A to ask for more data (because it never will). This allows for code like this to work
    const stream = _([1, 2, 3]);
    const fork1 = stream.fork();
    const fork2 = stream.fork();
    fork1.take(1).each(x => console.log('fork1', x));
    fork2.each(x => console.log('fork2', x));
    // => fork1 1
    // => fork2 1
    // => fork2 2
    // => fork2 3
    
  • Higher-level transforms like merge and flatMap will propagate the onDestroy event in some reasonable way to be documented in the specific transform. We haven't decided exactly how yet. This is not implemented.

It's intended to be used like so:

function readFromDb(query) {
  const db = acquireDbConnection();
  return _(db.read(query))
    .onDestroy(() => db.releaseConnection());

Note that onDestroy isn't exactly error propagation, however, you can use it for your usecase by appending the stopOnError transform somewhere downstream. This will also save you from having to use an observer to detect the end of the db read.

There's a beta on NPM that you can install via highland@next, but since onDestroy propagation isn't implemented for higher-level transforms, I can't recommend running it in production unless you're sure that you don't (and will never accidentally) use them.

Without using the 3.0 beta, you'll have to release the db connection manually. The best way to do it depends on your application's architecture, but I can think of a few ways you might do it.

  1. Pass along the database connection to the downstream consumer and have them manually release the connection when they detect an error. Use error or stopOnError.

  2. Apply the downstream transforms at the same time as you read from the database.

    function transformDb(dbQuery, transformAndWrite) {
      const db = acquireDbConnection();
      return _(db.read(dbQuery))
        .through(transformAndWrite)
        .stopOnError(err => {})
        .done(() => db.releaseConnection());
    }
    
    // Use it
    transformDb(dbQuery, stream => {
      // Do your transform and writes here.
      return stream.map(...)
        .flatMap(row => writeToDb(row));
    });
    

vqvu avatar Mar 06 '17 00:03 vqvu