State Changing Issue in the UI for Prefect 3
Bug summary
When a task in a flow fails, we attempt to bypass it by changing its state from "failed" to "completed." However, upon re-triggering the flow, the task that we marked as "completed" executes again instead of being skipped. Conversely, when we intentionally change a completed task to "failed" and retry the flow, it also fails to work as expected. We anticipated that the failed task would be skipped, but it gets executed again in Prefect 3.
In contrast, this functionality worked perfectly in Prefect 2.
I suspect this issue may be related to recent changes in the retry mechanism. It appears that for each retry within the flow, a new task instance is created instead of executing the existing task, as the task ID is different each time. This change in the task creation logic seems to be influenced by the dynamic_key, which affects how tasks are identified and executed in Prefect 3.
Version info (prefect version output)
Version: 3.0.5
API version: 0.8.4
Python version: 3.12.5
Git commit: 1d1b92a7
Built: Tue, Oct 8, 2024 5:55 PM
OS/Arch: darwin/arm64
Profile: default
Server type: cloud
Pydantic version: 2.8.2
Integrations:
prefect-kubernetes: 0.5.0
prefect-aws: 0.5.0
prefect-gcp: 0.6.0
prefect-github: 0.3.0
Additional context
No response
Thanks for the bug report @majo-aqfer! It seems like the result for your task runs is not being persisted and, therefore, can't be used on a flow run retry.
Can you share a simple example script where you're seeing this? It'll really help find where the issue is.
@desertaxle Following the issue reported by @majo-aqfer, please find below a sample example script where we are using persist_result=true. We have encountered a problem with the retry behavior change for result_storage_key. This issue has been addressed in PrefectHQ/prefect#15658, but it is not yet fully resolved. These challenges are hindering our migration to Prefect v3.
Two Scenarios:
-
Static Result Storage Issue: When using the same static result storage, we aim to transition a successful task run from a Completed state to a Failed state. However, due to the result already being available in storage_location, this behavior is problematic. In Prefect v2, we utilized task_run.id, which generated a unique value for each task run. In Prefect v3, the task_run.id is now dynamic for each task run, which introduces inconsistencies. Example Prefect-v2 format used:
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+"{task_run.id}.json" -
Bypassing Failed Tasks: We need to bypass failed tasks since they are handled separately, allowing other downstream tasks to proceed. However, because static_result is not available in the storage_location, the tasks execute regardless of the state change in the task.
A snapshot of the test run is attached below for reference.
from prefect import flow, task
from prefect import runtime
from prefect_aws.s3 import S3Bucket, AwsCredentials
from prefect_github.repository import GitHubRepository
from prefect.client.schemas.schedules import CronSchedule
from prefect.states import Failed, Completed
from prefect.cache_policies import TASK_SOURCE, INPUTS
import time
CID = "c008"
FLOW_NAME ="sample_v1"
SANDBOX = "c008"
CLUSTER = "aqfer-preprod-eks"
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+"result.json"
# LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+"{task_run.id}.json"
S3_BUCKET = S3Bucket.load("sample-s3", _sync=True)
@task(log_prints=True,persist_result=True,result_storage_key=LOCATION)
def sample_successfull_v1():
print("this is executing and should completed successfully")
return Completed(message="task is completed")
@task(log_prints=True,persist_result=True,result_storage_key=LOCATION)
def sample_failure_v1():
print("this is executing and should Fail")
time.sleep(10)
return Failed(message="task is failed")
@task(log_prints=True,persist_result=True,result_storage_key=LOCATION)
def sample_successfull_v2(param):
print("value : {}".format(param["par"]))
return Completed(message="task is completed")
@flow(log_prints=True, persist_result=True, validate_parameters=False, result_storage=S3_BUCKET)
def sample_v1(prev: str = None):
p = [{"par": "first"},{"par": "second"}]
f = sample_successfull_v1.with_options(task_run_name="sample_successfull_v1")(return_state=True)
print(f)
g = sample_failure_v1.with_options(task_run_name="sample_failure_v1").submit(wait_for=[f], return_state=True)
print(g)
h = sample_successfull_v2.with_options(task_run_name="sample_successfull_v2-[{param[par]}]").map(param=p, wait_for=[g], return_state=True)
print(h)
if f.is_failed() or g.is_failed() or any(state.is_failed() for state in h):
return Failed()
if g.is_completed():
return Completed()
else:
return Failed()
if __name__ == "__main__":
flow.from_source(
source= GitHubRepository.load("github-block"),
entrypoint = "datalake/cids/c008/flows/migration/sample_v1.py:sample_v1"
).deploy(
name = CID,
work_pool_name = "new-prefect-worker",
work_queue_name = CID,
paused=True,
schedules=[(CronSchedule(cron="*/5 * * * *"))],
tags = [CID,SANDBOX],
version = "6",
)
@desertaxle We are waiting for an update on the above details shared. Hope you are able to recreate the issue at your end.
Apologies for the delay in response!
Using the DEFAULT cache policy will get the behavior that I think you want to see. When I run the script you provided but update the cache_policy on each task to use the DEFAULT cache policy, I see the task that previously ran end up in a CACHED state and not rerun. The DEFAULT cache policy includes the RUN_ID policy, which will use the flow run ID when computing the cache key. Since the flow run is the same between retries, the tasks can reuse the persisted results correctly.
It's possible that I do not fully understand your use case, but try the DEFAULT cache policy, and if that doesn't get you the behavior you're expecting, let me know!
@desertaxle We have tried using the DEFAULT cache policy, but it does not resolve the issue for the two scenarios mentioned earlier.
Transitioning the state to cached is not the issue. To better address this, we would like to arrange a call to discuss the issue in detail for better understanding on the use cases mentioned.
How about this issue now? I try 3.5.0 and found the same issue. I think it’s very important ! Without this behavior, it’s impossible to implement exception handling in the production environment
@wingerted do you have a simple MRE for what you're seeing? That would be very helpful for understanding the issue.
@desertaxle think abort the mini example
from prefect import flow, task
from prefect.context import TaskRunContext
from prefect.logging import get_run_logger
import random
from prefect.cache_policies import DEFAULT
@task(cache_policy=DEFAULT)
def get_customer_ids() -> list[str]:
# Fetch customer IDs from a database or API
return [f"customer{n}" for n in range(3)]
@task(cache_policy=DEFAULT)
def process_customer(customer_id: str) -> str:
# Process a single customer
logger = get_run_logger()
logger.info(f"Processing customer {customer_id}")
raise
return f"Processed {customer_id}"
@flow
def main() -> list[str]:
customer_ids = get_customer_ids()
# Map the process_customer task across all customer IDs
results = process_customer.map(customer_ids)
return results
if __name__ == "__main__":
main.serve(name="my-first-deployment", cron="* * * * *")
There will be 3 failed process_customer, I want to mark one of them is success and retry flow. And I want to see only 2 other failed process_customer should be running, and 1 process_customer that I have mark succeed should not run.
Any update ? We can discuss.