streamz
streamz copied to clipboard
Should streamz handle exceptions?
What do we do when a stream receives bad data that causes an exception to be raised. For ex:
def foo(x):
if x is None:
raise Exception
else:
return x + 1
s = Stream()
s2 = s.map(foo)
s3.sink(print)
s.emit(1)
s.emit(None)
s.emit(2)
Here, foo is a point of vulnerability in the stream, where it may or may not cause the whole stream architecture to halt.
Is it worth trying to incorporate some quiet exception handling? I am not sure exactly how to tackle this so I'm being a little vague at this point. I can think of many ways of doing this. Here are a few:
- Catch the exceptions and emit them somehow (will require defining a data type). We can also not emit (and perhaps sink errors to a global list) but this may cause unintended synchronization consequences to the user.
- catch the exceptions in
s.emit. Note that in this case catching the exception may be harder to find
I'll think about it, but I would like to hear opinions from @mrocklin and @CJ-Wright (who has already handled this in his streams extension). My current method is to wrap all mapped functions to look for exceptions, and return a document that flags the document as having encountered an exception. This works in my subclassed module only though. It would be nice to unify this I think.
(Note: exceptions can occur not just in map but other things like filter etc. Other modules like zip may also want to be exception aware, that something passing through is bad data, and pass this on etc.)
Yes, at some point we should handle exceptions. Also stopping signals. I don't have a short term plan for this or any concrete thoughts.
This reminds me a little of the Try-Monads talk from pygotham (not that I think we should necessarily use Try-Mondads).
If you're curious then I recommend looking at what ReactiveX does here
On Tue, Oct 10, 2017 at 11:26 AM, Christopher J. Wright < [email protected]> wrote:
This reminds me a little of the Try-Monads talk from pygotham https://2017.pygotham.org/talks/try-monads-with-big-data-using-pyspark/ (not that I think we should necessarily use Try-Mondads).
— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/mrocklin/streamz/issues/86#issuecomment-335510692, or mute the thread https://github.com/notifications/unsubscribe-auth/AASszCEVyyl1paUEmZyBy4Dz7qoO2zMIks5sq4yXgaJpZM4P0Dku .
Relevent? http://reactivex.io/RxJava/2.x/javadoc/io/reactivex/exceptions/Exceptions.html
This might be one area where treating the graph as a walkable DAG may be helpful. One could imagine writing a function which walked the graph, looked for nodes where exceptions could occur (map, filter, etc.) and then decorates update to handle the exceptions, potentially with some callback function, logging, or sending the data + exception to another pipeline.
I think this general pattern, keep the pipeline simple and then decorate things once the DAG has been made is interesting.
We currently pass messages forward and futures backwards with the emit/update methods. My guess is that we would do something similar with exception and stop signals.
or sending the data + exception to another pipeline.
This is exactly how we handle exceptions in Apache NiFi and the feedback from community members seems to almost always be positive.