streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Should streamz handle exceptions?

Open jrmlhermitte opened this issue 8 years ago • 7 comments

What do we do when a stream receives bad data that causes an exception to be raised. For ex:

def foo(x):
    if x is None:
        raise Exception
    else:
        return x + 1
s = Stream()
s2 = s.map(foo)
s3.sink(print)

s.emit(1)
s.emit(None)
s.emit(2)

Here, foo is a point of vulnerability in the stream, where it may or may not cause the whole stream architecture to halt.

Is it worth trying to incorporate some quiet exception handling? I am not sure exactly how to tackle this so I'm being a little vague at this point. I can think of many ways of doing this. Here are a few:

  1. Catch the exceptions and emit them somehow (will require defining a data type). We can also not emit (and perhaps sink errors to a global list) but this may cause unintended synchronization consequences to the user.
  2. catch the exceptions in s.emit. Note that in this case catching the exception may be harder to find

I'll think about it, but I would like to hear opinions from @mrocklin and @CJ-Wright (who has already handled this in his streams extension). My current method is to wrap all mapped functions to look for exceptions, and return a document that flags the document as having encountered an exception. This works in my subclassed module only though. It would be nice to unify this I think.

(Note: exceptions can occur not just in map but other things like filter etc. Other modules like zip may also want to be exception aware, that something passing through is bad data, and pass this on etc.)

jrmlhermitte avatar Oct 10 '17 14:10 jrmlhermitte

Yes, at some point we should handle exceptions. Also stopping signals. I don't have a short term plan for this or any concrete thoughts.

mrocklin avatar Oct 10 '17 14:10 mrocklin

This reminds me a little of the Try-Monads talk from pygotham (not that I think we should necessarily use Try-Mondads).

CJ-Wright avatar Oct 10 '17 15:10 CJ-Wright

If you're curious then I recommend looking at what ReactiveX does here

On Tue, Oct 10, 2017 at 11:26 AM, Christopher J. Wright < [email protected]> wrote:

This reminds me a little of the Try-Monads talk from pygotham https://2017.pygotham.org/talks/try-monads-with-big-data-using-pyspark/ (not that I think we should necessarily use Try-Mondads).

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/mrocklin/streamz/issues/86#issuecomment-335510692, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszCEVyyl1paUEmZyBy4Dz7qoO2zMIks5sq4yXgaJpZM4P0Dku .

mrocklin avatar Oct 10 '17 15:10 mrocklin

Relevent? http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/exceptions/Exceptions.html

CJ-Wright avatar Oct 10 '17 16:10 CJ-Wright

This might be one area where treating the graph as a walkable DAG may be helpful. One could imagine writing a function which walked the graph, looked for nodes where exceptions could occur (map, filter, etc.) and then decorates update to handle the exceptions, potentially with some callback function, logging, or sending the data + exception to another pipeline.

I think this general pattern, keep the pipeline simple and then decorate things once the DAG has been made is interesting.

CJ-Wright avatar Aug 31 '19 18:08 CJ-Wright

We currently pass messages forward and futures backwards with the emit/update methods. My guess is that we would do something similar with exception and stop signals.

mrocklin avatar Aug 31 '19 23:08 mrocklin

or sending the data + exception to another pipeline.

This is exactly how we handle exceptions in Apache NiFi and the feedback from community members seems to almost always be positive.

jdye64 avatar Dec 03 '19 22:12 jdye64