metaflow icon indicating copy to clipboard operation
metaflow copied to clipboard

How to define custom decorators for generic try-catch handling

Open davified opened this issue 3 years ago • 5 comments

Hi there, thanks again for the great work on Metaflow :-)

Context

  • We have a couple of flows, with say 10 steps each. We have a use case where, should any step fail, we want to exercise a common piece of code (in our case, emit an event that triggers our CI pipeline). This is so that: a. if a flow succeeds, we trigger our CI pipeline to deploy our model b. if a flow fails, we trigger our CI pipeline and make it go red, so that our CI pipeline continues to be a useful information radiator (especially w.r.t. change failures).

Question We have implemented (a) and (b). We emit the event at the end step, and we're able to trigger our CI pipeline. However, we'd also like (b) to happen should any step fail. So my question is: What would be a good way to implement this so that each step, if it fails, will invoke the same piece of code that emits the event to trigger our CI pipeline?

I was thinking along the lines of this pseudo-code sample, but I'm finding that it's not easy to define a decorator like @notify_ci_on_error

class MyFlow(FlowSpec)
    @notify_ci_on_error
    @batch(...)
    @step
    def start(self):
        self.next(self.do_something)

    @notify_ci_on_error
    @batch(...)
    @step
    def do_something(self):
        self.next(self.end)

    @notify_ci_on_error
    @batch(...)
    @step
    def end(self):
        pass


def notify_ci_on_error(func):
    def wrapper():
        try:
            func()
        except:
            print('step failed! notifying CI')
            raise
    return wrapper

Thanks for your time!

davified avatar Oct 12 '21 06:10 davified

@davified there are a few different ways to achieve this.

  • If all your jobs are running on AWS Batch (or AWS Step Functions) - you can rely on CloudWatch Events to capture whether the AWS Batch job has failed or succeeded and use an AWS lambda function to notify your CI.
  • Another approach would be to use the @catch decorator and check in the subsequent step if the @catch decorator was triggered to then notify the CI. This wouldn't guard against the case where the subsequent step in itself doesn't execute for whatever reason.

Depending on your use case, we can recommend a few other ways as well. Please let me know.

savingoyal avatar Oct 12 '21 20:10 savingoyal

Thanks @savingoyal !

CloudWatch Events sounds promising, and sounds like the right way to do it. In our case, however, we're trying to see if we can avoid introducing another architectural component to keep things simple.

Eventually I came quite close to implementing what I described with a simple decorator function. However, I'm stuck on extracting the value of param_a in the example below (see the try/catch block.

I can see that the Parameter is accessible as self.param_a. However, because I'm in the scope of a decorator function, I don't have access to the self object. :/

May I know if there's a way to access the value of a Parameter outside of a flow?

MyFlow(FlowSpec):
    param_a = Parameter("param-a", required=True) # We need this when notifying CI

    @step
    @notify_ci_on_error(param_a)
    def start(self):
        # do stuff


# in another file
def notify_ci_on_error(param_a):
    def decorator(func):
        @functools.wraps(func)
        def wrapper(*args, **kwargs):
            try:
                func(*args, **kwargs)
            except:
                # PROBLEM: param_a is a Parameter object, and I'm not able to extract its value
                notify_ci(param_a)
                raise
            return wrapper
        return decorator

davified avatar Oct 17 '21 23:10 davified

Would notify_ci(self.param_a) work for you?

savingoyal avatar Oct 17 '21 23:10 savingoyal

Replying here because a concrete example of how to incorporate a custom decorator would be super useful...

Are there any out there? Were you able to achieve this @davified ?

Using your example with the context object is good enough for me, but I am getting an error when the wrapper tries to call the start step:

Traceback (most recent call last):
  File "src/example_flow.py", line 39, in <module>
    class ExampleFlow(FlowSpec):
  File "src/example_flow.py", line 51, in ExampleFlow
    def start(self):
TypeError: 'NoneType' object is not callable

bennnym avatar Dec 21 '21 02:12 bennnym

Thanks for the tips @savingoyal !

In the end, this was what we implemented:

def notify_ci_on_error(func):
    """
    Parameters
    ----------
    func: accepts a function/step in a Metaflow Flow. The flow object must have these attributes:
    - some_var_x
    - some_var_y

    Returns
    -------
    wrapper
        a function that is wrapped with try/catch logic.
    """

    def wrapper(*args, **kwargs):
        flow, *_ = args
        # args = arguments passed to the step. For example: in def join(self, inputs), args would be (self, inputs)
        # flow = the `self` object passed into each step in a Flow
        # *_ is needed to allow this line to work regardless of whether we pass only 1 (e.g. `self`) or more parameters (e.g. `self, inputs`)

        try:
            some_var_x = flow.some_var_x
            some_var_y = flow.some_var_y
        except Exception as e:
            raise Exception(
                "Flow is missing one or more of these attributes required by notify_ci_on_error: "
                "some_var_x, some_var_y"
            ) from e
        try:
            func(*args, **kwargs)
        except:
            # add your generic exception handling logic here. For us, we notify CI pipeline with id (some_var_x) that flow with name (some_var_y) has failed.
            
            logger.info(f"Step failed! Notifying queue")
            raise

    wrapper.__name__ = func.__name__  # This prevents an invalid self.next() transition error: "Step specifies a transition to an unknown step, wrapper."

    return wrapper


# usage:
class MergeTrainFlow(FlowSpec):
    some_var_x = "..."
    some_var_y = "..."

    @batch(...)
    @step
    @notify_ci_on_error
    def start(self):
        pass

    @batch(...)
    @step
    @notify_ci_on_error
    def doing_stuff(self):
        pass

Just to call out a few things:

  • If the decorator needs any state, we define it as a class attribute in the flow, and access it via the flow object in the decorator (for example, in our case, we notify a downstream CI pipeline that a Flow has failed, and we use this to pass on some useful metadata (e.g. which CI pipeline to trigger, the name of the flow, etc)
  • This will work for any exception raised within your function or step. Unfortunately, it doesn't know about any exceptions happening outside of the step (e.g. if Metaflow fails to serialise a self.something_unserialisable = ...), @notify_ci_on_error doesn't catch that.

davified avatar Dec 21 '21 21:12 davified