metaflow icon indicating copy to clipboard operation
metaflow copied to clipboard

feature: support argo retry

Open dhpikolo opened this issue 10 months ago • 0 comments

Seems like argo retry does not work as expected when a workflow is triggered using metaflow retry enabled.

Current Behaviour

Argo retry restarts failed tasks and resets the retry count to 0, overwriting previous artifacts. The downstream task uses the latest attempt as the input artifact, leading to failure due to missing artifacts. These artifacts were recorded in the overwritten attempt 0 but are absent in the latest attempt.

Desired Behaviour

Ideally, hitting argo retry button should create a new attempt rather than overwriting the old attempt.

Metaflow ChatRoom Discussion

Solution Proposal

Infer the retry_count from flow datastore [example: s3]. The retry_count will be the latest_done _attempt number + 1. This will not overwrite artifacts on retry aka, the restarted run would add artifacts as it was new attempt.

Steps to reproduce:

Run

python hello_world.py argo-workflows create
python hello_world.py argo-workflows trigger
import pandas as pd
from metaflow import (
    FlowSpec,
    Parameter,
    card,
    project,
    step,
    retry,
    environment
)

from metaflow import current

ENV_VARS = {
    "FAIL_FLOW": "YES",
}

@project(name="dummy_project")
class HelloWorld(FlowSpec):
    force_error = Parameter("force-error", type=bool, default=False)

    @card
    @step
    def start(self):
        self.next(self.print_df)

    @card
    @environment(vars=ENV_VARS)
    @step
    def print_df(self):
        import os

        if os.environ["FAIL_FLOW"] == "YES":
            raise Exception(f"Fail now, current retry count: {current.retry_count}")    
        else:
            self.new_df = pd.DataFrame(
            {
                "city": [1, 2, 3],
                "country": ["de", "de", "at"],
                "order_id": [5, 6, 7],
                "score": [0.5, 0.6, 0.7],
            }
            )
        # pass this step on retry
        print(f"count is = {current.retry_count}")
        self.next(self.end)
    
    @card  
    @step
    def end(self):
        print(f"the new df is: {self.new_df}")

if __name__ == "__main__":
    HelloWorld()

  • Argo workflow with metaflow retry enabled and fails in all retries. Due to error induced via envar

Image

  • Node print_df passes after a fix in the backend [or updated the workflow with right env-var value], and then hit argo retry --node button.

  • end step fails with a missing artifact, new_df is a artifiact from the parent step, which was created when the parent node was successful on retry.

    print(f"the new df is: {self.new_df}")
                            ^^^^^^^^^^^
  File "/app/metaflow/metaflow/flowspec.py", line 250, in __getattr__
    raise AttributeError("Flow %s has no attribute '%s'" % (self.name, name))
AttributeError: Flow HelloWorld has no attribute 'new_df'

Image

dhpikolo avatar Feb 19 '25 09:02 dhpikolo