feature: support argo retry
Seems like argo retry does not work as expected when a workflow is triggered using metaflow retry enabled.
Current Behaviour
Argo retry restarts failed tasks and resets the retry count to 0, overwriting previous artifacts. The downstream task uses the latest attempt as the input artifact, leading to failure due to missing artifacts. These artifacts were recorded in the overwritten attempt 0 but are absent in the latest attempt.
Desired Behaviour
Ideally, hitting argo retry button should create a new attempt rather than overwriting the old attempt.
Solution Proposal
Infer the retry_count from flow datastore [example: s3]. The retry_count will be the latest_done _attempt number + 1. This will not overwrite artifacts on retry aka, the restarted run would add artifacts as it was new attempt.
Steps to reproduce:
Run
python hello_world.py argo-workflows create
python hello_world.py argo-workflows trigger
import pandas as pd
from metaflow import (
FlowSpec,
Parameter,
card,
project,
step,
retry,
environment
)
from metaflow import current
ENV_VARS = {
"FAIL_FLOW": "YES",
}
@project(name="dummy_project")
class HelloWorld(FlowSpec):
force_error = Parameter("force-error", type=bool, default=False)
@card
@step
def start(self):
self.next(self.print_df)
@card
@environment(vars=ENV_VARS)
@step
def print_df(self):
import os
if os.environ["FAIL_FLOW"] == "YES":
raise Exception(f"Fail now, current retry count: {current.retry_count}")
else:
self.new_df = pd.DataFrame(
{
"city": [1, 2, 3],
"country": ["de", "de", "at"],
"order_id": [5, 6, 7],
"score": [0.5, 0.6, 0.7],
}
)
# pass this step on retry
print(f"count is = {current.retry_count}")
self.next(self.end)
@card
@step
def end(self):
print(f"the new df is: {self.new_df}")
if __name__ == "__main__":
HelloWorld()
- Argo workflow with metaflow retry enabled and fails in all retries. Due to error induced via envar
-
Node
print_dfpasses after a fix in the backend [or updated the workflow with right env-var value], and then hitargo retry --nodebutton. -
endstep fails with a missing artifact,new_dfis a artifiact from the parent step, which was created when the parent node was successful on retry.
print(f"the new df is: {self.new_df}")
^^^^^^^^^^^
File "/app/metaflow/metaflow/flowspec.py", line 250, in __getattr__
raise AttributeError("Flow %s has no attribute '%s'" % (self.name, name))
AttributeError: Flow HelloWorld has no attribute 'new_df'