metaflow
metaflow copied to clipboard
How to define custom decorators for generic try-catch handling
Hi there, thanks again for the great work on Metaflow :-)
Context
- We have a couple of flows, with say 10 steps each. We have a use case where, should any step fail, we want to exercise a common piece of code (in our case, emit an event that triggers our CI pipeline). This is so that: a. if a flow succeeds, we trigger our CI pipeline to deploy our model b. if a flow fails, we trigger our CI pipeline and make it go red, so that our CI pipeline continues to be a useful information radiator (especially w.r.t. change failures).
Question
We have implemented (a) and (b). We emit the event at the end
step, and we're able to trigger our CI pipeline. However, we'd also like (b) to happen should any step fail. So my question is: What would be a good way to implement this so that each step, if it fails, will invoke the same piece of code that emits the event to trigger our CI pipeline?
I was thinking along the lines of this pseudo-code sample, but I'm finding that it's not easy to define a decorator like @notify_ci_on_error
class MyFlow(FlowSpec)
@notify_ci_on_error
@batch(...)
@step
def start(self):
self.next(self.do_something)
@notify_ci_on_error
@batch(...)
@step
def do_something(self):
self.next(self.end)
@notify_ci_on_error
@batch(...)
@step
def end(self):
pass
def notify_ci_on_error(func):
def wrapper():
try:
func()
except:
print('step failed! notifying CI')
raise
return wrapper
Thanks for your time!
@davified there are a few different ways to achieve this.
- If all your jobs are running on AWS Batch (or AWS Step Functions) - you can rely on CloudWatch Events to capture whether the AWS Batch job has failed or succeeded and use an AWS lambda function to notify your CI.
- Another approach would be to use the
@catch
decorator and check in the subsequent step if the@catch
decorator was triggered to then notify the CI. This wouldn't guard against the case where the subsequent step in itself doesn't execute for whatever reason.
Depending on your use case, we can recommend a few other ways as well. Please let me know.
Thanks @savingoyal !
CloudWatch Events sounds promising, and sounds like the right way to do it. In our case, however, we're trying to see if we can avoid introducing another architectural component to keep things simple.
Eventually I came quite close to implementing what I described with a simple decorator function. However, I'm stuck on extracting the value of param_a
in the example below (see the try/catch block.
I can see that the Parameter is accessible as self.param_a
. However, because I'm in the scope of a decorator function, I don't have access to the self
object. :/
May I know if there's a way to access the value of a Parameter
outside of a flow?
MyFlow(FlowSpec):
param_a = Parameter("param-a", required=True) # We need this when notifying CI
@step
@notify_ci_on_error(param_a)
def start(self):
# do stuff
# in another file
def notify_ci_on_error(param_a):
def decorator(func):
@functools.wraps(func)
def wrapper(*args, **kwargs):
try:
func(*args, **kwargs)
except:
# PROBLEM: param_a is a Parameter object, and I'm not able to extract its value
notify_ci(param_a)
raise
return wrapper
return decorator
Would notify_ci(self.param_a)
work for you?
Replying here because a concrete example of how to incorporate a custom decorator would be super useful...
Are there any out there? Were you able to achieve this @davified ?
Using your example with the context object is good enough for me, but I am getting an error when the wrapper tries to call the start step:
Traceback (most recent call last):
File "src/example_flow.py", line 39, in <module>
class ExampleFlow(FlowSpec):
File "src/example_flow.py", line 51, in ExampleFlow
def start(self):
TypeError: 'NoneType' object is not callable
Thanks for the tips @savingoyal !
In the end, this was what we implemented:
def notify_ci_on_error(func):
"""
Parameters
----------
func: accepts a function/step in a Metaflow Flow. The flow object must have these attributes:
- some_var_x
- some_var_y
Returns
-------
wrapper
a function that is wrapped with try/catch logic.
"""
def wrapper(*args, **kwargs):
flow, *_ = args
# args = arguments passed to the step. For example: in def join(self, inputs), args would be (self, inputs)
# flow = the `self` object passed into each step in a Flow
# *_ is needed to allow this line to work regardless of whether we pass only 1 (e.g. `self`) or more parameters (e.g. `self, inputs`)
try:
some_var_x = flow.some_var_x
some_var_y = flow.some_var_y
except Exception as e:
raise Exception(
"Flow is missing one or more of these attributes required by notify_ci_on_error: "
"some_var_x, some_var_y"
) from e
try:
func(*args, **kwargs)
except:
# add your generic exception handling logic here. For us, we notify CI pipeline with id (some_var_x) that flow with name (some_var_y) has failed.
logger.info(f"Step failed! Notifying queue")
raise
wrapper.__name__ = func.__name__ # This prevents an invalid self.next() transition error: "Step specifies a transition to an unknown step, wrapper."
return wrapper
# usage:
class MergeTrainFlow(FlowSpec):
some_var_x = "..."
some_var_y = "..."
@batch(...)
@step
@notify_ci_on_error
def start(self):
pass
@batch(...)
@step
@notify_ci_on_error
def doing_stuff(self):
pass
Just to call out a few things:
- If the decorator needs any state, we define it as a class attribute in the flow, and access it via the
flow
object in the decorator (for example, in our case, we notify a downstream CI pipeline that a Flow has failed, and we use this to pass on some useful metadata (e.g. which CI pipeline to trigger, the name of the flow, etc) - This will work for any exception raised within your function or step. Unfortunately, it doesn't know about any exceptions happening outside of the step (e.g. if Metaflow fails to serialise a
self.something_unserialisable = ...
),@notify_ci_on_error
doesn't catch that.