flyte
flyte copied to clipboard
[BUG] Launching in fast mode from a file containing workflow tasks and launch plans results in an error
Describe the bug
We recently upgraded flytekit
from 1.7.0 to 1.10.2 and since then we have seen an error when launching an execution in fast mode. When we have a python file that contains workflows, tasks, and launch plans and try to run that in fast mode, we get an error:
$ python3 workflow.py --inference_date=2024-01-07 --n_days_horizon=180
╭─────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────── Traceback (most recent call last) ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ /.../workflow.py:319 in <module> │
│ │
│ ❱ 319 launch_plan = LaunchPlan.get_or_create( │
│ │
│ /.../lib/python3.10/site-packages/flytekit/core/launch_plan.py:273 in get_or_create │
│ │
│ ❱ 273 │ │ │ │ raise AssertionError("The cached values aren't the same as the current c │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
AssertionError: The cached values aren't the same as the current call arguments
The workaround has been to put the launch plan definitions in a separate file from the workflow and task definitions.
There are no errors when we run this in non-fast mode.
Expected behavior
Ideally we can include launch plan definitions in the same file as the workflow and task definitions.
Additional context to reproduce
Sorry I don't have a minimal reproducible example, but these are the steps I think will reproduce this.
- Create an environment using
flytekit==1.10.2
. - Create a python file that defines a workflow, at least one task, and at least one launch plan. The launch plan should accept arguments.
- Run the execution in fast mode from the command line.
- Assuming you get the same error described above, try moving the launch plan to a separate file.
- Run the execution from the new file containing only the launch plan.
- This workaround has worked for us.
Screenshots
No response
Are you sure this issue hasn't been raised already?
- [X] Yes
Have you read the Code of Conduct?
- [X] Yes
🤖
Hi @dennisobrien 👋,
I'm Dosu, a friendly bot here to assist you! I'm here to help you with bugs, answer questions, and guide you if you want to contribute. While we wait for a human maintainer, I'm all yours! Let's get your issue sorted!
Thank you for reporting this issue. It seems like you're encountering an AssertionError
when using the get_or_create
method from the launch_plan.py
file in flytekit
version 1.10.2. This error is raised when the cached values aren't the same as the current call arguments.
Based on the changes made in flytekit
version 1.10.2, there were several updates that could potentially affect the execution of workflows, tasks, and launch plans in fast mode. However, none of these changes seem to directly relate to the issue you're experiencing.
The get_or_create
method is typically used to either fetch an existing launch plan or create a new one if it doesn't exist. The error message suggests that there might be a mismatch between the cached values and the current call arguments, which could be due to a variety of reasons such as changes in the input parameters or their order.
Unfortunately, without a minimal reproducible example, it's difficult to pinpoint the exact cause of this issue. If possible, could you provide a simplified version of your workflow, task, and launch plan definitions that still reproduces the error? This would greatly help in identifying the root cause of the problem.
In the meantime, you might want to consider downgrading to a previous version of flytekit
where this issue was not present, until a fix is available.
Sources
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
This seems to be an issue in https://github.com/flyteorg/flytekit/blob/949bc1b40e1966b409c821b96011d900241b22fd/flytekit/core/launch_plan.py#L262
Encountered the same issue, seems that the "function" attribute in the workflows aren't the same. I also noticed that in https://github.com/flyteorg/flytekit/blob/949bc1b40e1966b409c821b96011d900241b22fd/flytekit/core/launch_plan.py#L424
, this __call__
method seems not invoked if launch plan definitions are in the same file as the workflow and task definitions, and seems that the get_or_create
method is invoked twice.
Below is my minimal reproducible example:
from math import sqrt
from flytekit import workflow, task, LaunchPlan
from typing import List, Union
import random
import pdb
@task
def standard_deviation(values: List[float], mu: float) -> float:
variance = sum([(x - mu) ** 2 for x in values])
return sqrt(variance)
@task
def standard_scale(values: List[float], mu: float, sigma: float) -> List[float]:
return [(x - mu) / sigma for x in values]
@task
def mean(values: List[float]) -> float:
return sum(values) / len(values)
@task
def generate_data(num_samples: int, seed: int) -> List[float]:
random.seed(seed)
return [random.random() for _ in range(num_samples)]
@workflow
def standard_scale_workflow(values: List[float]) -> List[float]:
mu = mean(values=values)
sigma = standard_deviation(values=values, mu=mu)
return standard_scale(values=values, mu=mu, sigma=sigma)
@workflow
def workflow_with_subworkflow(num_samples: int, seed: int) -> List[float]:
data = generate_data(num_samples=num_samples, seed=seed)
return standard_scale_workflow(values=data)
standard_scale_launch_plan = LaunchPlan.get_or_create(
standard_scale_workflow,
name="standard_scale_lp_new",
default_inputs={"values": [3.0, 4.0, 5.0]}
)
if __name__ == "__main__":
# pdb.set_trace()
print(standard_scale_launch_plan())
We're seeing this too, clearing the local cache does not resolve it either, nor does using overwrite_cache=True
in LaunchPlan.get_or_create()
or incrementing cache versions up.
Cc @wild-endeavor / @eapolinario is this resolved?