-
Notifications
You must be signed in to change notification settings - Fork 151
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Generic exception handling #433
base: master
Are you sure you want to change the base?
Conversation
I like this idea! I can't promise exactly when I can have a look, but soon.
|
4e53fac
to
1ac75d7
Compare
Hmm, good questions:
All existing tests now pass and the linting complaints have been taken care of. Remaining things:
But I wont continue further before you have had a chance to take a look. |
I have had a chance to glance at the code, and it had no occurred to me that the exception handling needs to go into all stream nodes, and handled upstream of the actual exception, in order to make things possible. I can see why you can't put exception handling into the node that is actually generating the exception, but might I suggest a less general change. You could make a node for this specific functionality, something like class ExceptionHandler(Stream):
def __init__(self, upstream, on_error, catch=Exception, **kwargs): ...
def update(self, x, who=None, metadata=None):
try:
self.emit(x, metadata=metadata)
except self.catch as e:
self.on_error.update((x, e), self, metadata=metadata) (here, Finally, cc @jsmaupin |
Thanks for taking a look 👍 Yes, my opinion is that a fully generic and integrated mechanism for exception handling probably needs to be present in all nodes. With that said, your suggestion is a less intrusive (but makes "building" the pipeline a bit more verbose and perhaps slightly less intuitive (maybe a highly personal preference?)) so if you feel my suggestion is over the top feel free to close this PR. |
I think I would prefer the minimal change - would you like to code it up along the lines of my suggestion? I think your example should be easy enough to make. |
@martindurant from streamz import Stream
def problem(item):
try:
raise ValueError
except Exception as exc:
return exc
s = Stream()
mapper = s.map(problem)
exception_branch = mapper.filter(lambda x: isinstance(x, Exception)) # Handle exceptions
everything_good = mapper.filter(lambda x: not isinstance(x, Exception)) # Continue as usual After trying out your suggestion I realized I probably wouldn't use it since it messes with my mental model of the pipeline (i.e. needing to explicitly define an exception handler upstream of where an exception might happen). I can also be very sure where the exception originates from. So, I will choose to not implement your suggestion after all. Sorry for that. On a slight side note: This could perhaps be solved by defining some kind of "composed" node which functions according to the above logic. |
I think there is a general discussion to be had here, and perhaps @jsmaupin , @roveo and @chinmaychandak might have thoughts. At the moment, a node can have multiple downstreamz, and they are all of equal importance. There are also some streamz nodes that take multiple upstramz (e.g., combine_latest), where one of those inputs may be special, and the logic for this is contained within the node. We did discuss the possibility of multiple possible output stream types for a node, and the exception case would be one of these. We could annotate them with different line styles in the graph display. Do we have multiple output types? Should there be, for example, a The main thing that I had against this PR as it stands, is that the logic for handling an exception is upstream, but the downstream holds the exception output stream. On the other hand, it's nice to only have to edit |
My thoughts:
instead of checking for error type in the handler function and re-raising if it's not
dicts = src.map(json.loads)
invalid_batches = dicts.on_exception(ParsingError).partition(100, timeout=30)
invalid_batches.sink_to_s3(...)
invalid_batches.map(make_slack_message).sink_to_slack(...) So I want to wait 30 seconds if there are any more bad records, write them to S3 for manual inspection and notify myself. This looks like a normal stream, just a different branch.
sink = result.sink_to_db(...)
sink.on_exception(InvalidDataError).sink_to_file("rejecteddata.log")
sink.on_exception(ConnectionError).retry(wait_seconds=10, max_retries=5)
node.try().something().something_else().except(Exception).map(handle) |
Note that it's OK to break the API, since almost all streamz code lives right here, and we can edit classes as needed to achieve what we want. Of course, we'd rather not change the user-called methods (a small number).
That is an interesting API! Now there would be something tricky to show graphically or indeed to implement :) |
Stumbled upon this again after not having used streamz for a while. For what its worth, I currently use the following for exception handling, it gives me the flexibility I need but I wouldn't be surprised if it has some nasty side effects that I have yet to discover... from types import MethodType
from streamz import Stream, Sink
@Stream.register_api()
class on_exception(Sink):
def __init__(self, upstream: Stream, exception=Exception, **kwargs):
super().__init__(upstream, **kwargs)
original_upstream_update_method = upstream.update
def _(upstream_self, x, who=None, metadata=None):
try:
return original_upstream_update_method(x, who, metadata)
except exception as exc:
# Pass down the branch started with this stream instead
self._emit((x, exc), metadata)
# Bind to upstream
upstream.update = MethodType(_, upstream)
def update(self, x, who=None, metadata=None):
pass # NO-OP |
Hah, monkey-patch :) I still think there is merit in making this kind of operation available for everyone. I'm afraid I still like my solution, which can be fleshed out a bit: class ExceptionHandler(Stream):
def __init__(self, upstream, on_error: Stream,
catch: Tuple[Exception] = (Exception,), retries=0, retry_wait=1, **kwargs):
self.on_error = on_error
self.catch = catch
self.retries = retries
self.retry_wait = retry_wait
super().__init__(upstream, **kwargs)
def update(self, x, who=None, metadata=None):
retries = self.retries
while True:
try:
return super().emit(x, metadata=metadata)
except self.catch as e:
if retries > 1:
time.sleep(self.retry_wait) # or async?
retries -= 1
continue
return self.on_error.update((x, e), self, metadata=metadata) |
Work in progress: A (naive?) stab at generic exception handling within streamz.
This PR attaches an exception handler to each Stream object that acts as an optional start of a new pipeline branch.
This PR requires some linting, a few tests fail (but nothing major it seems) and updated docs. Consider it a starting point for discussion, mainly in relation to #86. Would appreciate some feedback on this before I continue.