Behavioural Issues in Retry Logic for Failed Tasks
Bug summary
Expected Behavior and Recent Changes in Task Retry Logic:
Expected Behavior: In our workflow, when a task fails and is retried, we expect the same task instance to be executed again rather than initiating a new task instance. This design allows us to efficiently manage retries, ensuring that we do not unnecessarily re-execute tasks that have already completed successfully.
Recent Changes in Prefect 3: The release of Prefect 3 has brought significant changes to the backend behavior concerning task retries, particularly in how the dynamic_key is handled. In previous versions, this key operated as a static sequence (e.g., 0 → 1 → 2), providing a consistent identifier for task instances. However, in Prefect 3, the dynamic_key has been altered to generate a random value for each task execution. This shift has crucial implications for task retries: when a retry is attempted on a failed task, the change in the dynamic_key leads to the re-execution of all tasks in the flow rather than just the failed task. In Prefect 2, the static nature of the dynamic_key ensured that the same task instance would be retried. In contrast, Prefect 3’s dynamic key changes with each trigger, resulting in a new task_run.id that does not correspond to previously stored results. Consequently, the flow cannot locate the prior results, necessitating the re-execution of all tasks, including those that had already completed successfully.
Result Storage in S3:
We are utilizing Amazon S3 for result storage, following this location format:
"results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+"{task_run.id}.json"
In this structure, task_run.id serves as the filename. However, because task_run.id is now generated as a random value for each retry, the system is unable to retrieve previously stored results. As a result, all tasks are executed anew, regardless of their previous completion status.
Temporary Solution:
To address this issue, we explored the option of saving results under a static filename, such as:
"results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+result.json
This modification has proven effective in preventing the re-execution of tasks that have already completed during retries, as these tasks are recognized as cached (type = complete).
However, we have encountered a new challenge: when a task fails and is retried, it is not re-executed; instead, it is incorrectly marked as cached (type = complete). This behavior prevents the failed task from running again, potentially leading to incomplete workflows, as downstream tasks may proceed without addressing the initial failure.
Conclusion: The recent changes in Prefect 3's task retry behavior have introduced complexities that significantly impact our approach to task execution and result retrieval. While utilizing a static filename for result storage has improved the handling of completed tasks during retries, it has inadvertently caused failed tasks to be misclassified as complete, thereby disrupting the overall workflow. We need to investigate further solutions to ensure that failed tasks are retried appropriately while maintaining the efficiency of completed tasks.
code sample:
from prefect import flow, task
from prefect import runtime
from prefect_aws.s3 import S3Bucket, AwsCredentials
from prefect_github.repository import GitHubRepository
from prefect.client.schemas.schedules import CronSchedule
from prefect.states import Failed, Completed
from prefect.cache_policies import TASK_SOURCE, INPUTS
import time
CID = "c008"
FLOW_NAME ="sample_v1"
SANDBOX = "c008"
CLUSTER = "aqfer-preprod-eks"
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+"result.json"
# LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+"{flow_run.id}"+"/"+"{task_run.name}"+"/"+"{task_run.id}.json"
S3_BUCKET = S3Bucket.load("sample-s3", _sync=True)
@task(log_prints=True,persist_result=True,result_storage_key=LOCATION)
def sample_successfull_v1():
print("this is executing and should completed successfully")
return Completed(message="task is completed")
@task(log_prints=True,persist_result=True,result_storage_key=LOCATION)
def sample_failure_v1():
print("this is executing and should Fail")
time.sleep(10)
return Failed(message="task is failed")
@task(log_prints=True,persist_result=True,result_storage_key=LOCATION)
def sample_successfull_v2(param):
print("value : {}".format(param["par"]))
return Completed(message="task is completed")
@flow(log_prints=True, persist_result=True, validate_parameters=False, result_storage=S3_BUCKET)
def sample_v1(prev: str = None):
p = [{"par": "first"},{"par": "second"}]
f = sample_successfull_v1.with_options(task_run_name="sample_successfull_v1")(return_state=True)
print(f)
g = sample_failure_v1.with_options(task_run_name="sample_failure_v1").submit(wait_for=[f], return_state=True)
print(g)
h = sample_successfull_v2.with_options(task_run_name="sample_successfull_v2-[{param[par]}]").map(param=p, wait_for=[g], return_state=True)
print(h)
if f.is_failed() or g.is_failed() or any(state.is_failed() for state in h):
return Failed()
if g.is_completed():
return Completed()
else:
return Failed()
if __name__ == "__main__":
flow.from_source(
source= GitHubRepository.load("github-block"),
entrypoint = "datalake/cids/c008/flows/migration/sample_v1.py:sample_v1"
).deploy(
name = CID,
work_pool_name = "new-prefect-worker",
work_queue_name = CID,
paused=True,
schedules=[(CronSchedule(cron="*/5 * * * *"))],
tags = [CID,SANDBOX],
version = "6",
)
Version info (prefect version output)
Version: 3.0.5
API version: 0.8.4
Python version: 3.12.5
Git commit: 1d1b92a7
Built: Tue, Oct 8, 2024 5:55 PM
OS/Arch: darwin/arm64
Profile: default
Server type: cloud
Pydantic version: 2.8.2
Integrations:
prefect-kubernetes: 0.5.0
prefect-aws: 0.5.0
prefect-gcp: 0.6.0
prefect-github: 0.3.0
Additional context
Hi @majo-aqfer - excellent write-up of your issue, thank you. A few quick comments:
result_storage_keyconfigures persistence for all persisted results, which currently includes failed payloads; we do want to remove this persistence of failed states, but certain task runners currently rely on it so we haven't been able to do so yet- the best way to achieve what you're looking for is to use
cache_key_fninstead ofresult_storage_key; this will require a slight modification of your templated string into a function that matches the appropriate signature of cache key functions, but will achieve what you're looking for. In Prefect 2, this would not have worked (as cache keys were not actually related to result storage locations), but in Prefect 3 it will do exactly what you're looking for (and a random file name will be generated for failed state persistence)
All that being said, I personally think we should always use a random filename for failed state persistence, which would allow you to continue using result_storage_key instead of refactoring - let me look into that.
Hi @cicdw - Thank you for your response.
We have already tried out using cache_policy and now as per your suggestion we have tried out cache_key_fn as well.
Still we are getting same response as above mentioned by @majo-aqfer . The completed task majo_test is getting re-executed even after the first successful run. We can able to see the retry is blindly retrying all the task run irrespective of the result of the task.
Note: We prefer using result_storage_key
Sample code we have tested:
def static_cache_key(context, parameters):
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+str(context.task_run.flow_run_id)+"/"+context.task_run.name+"/"+str(context.task_run.id)+".json"
return LOCATION
@task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key)
def majo_test():
print("this is executing and should completed successfully")
return Completed(message="task is completed")
@task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key)
def majo_2():
if runtime.flow_run.run_count > 1:
return Completed(message="task is completed")
print("this is executing and should Fail")
time.sleep(10)
return Failed(message="task is failed")
@Hafeez-Aqfer ah sorry, the reason this is still re-executing is that your cache key is actually not static -- both task_run.id and task_run.name are generated per task function invocation. To see the behavior you're expecting you'll need to make sure the parametrized elements of your location are static across invocations within the same flow run: in your case I recommend removing task_run.id entirely and either using task_run.task.name or generating your own task run name:
def static_cache_key(context, parameters):
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+str(context.task_run.flow_run_id)+"/"+str(context.task_run.task.name)+".json"
return LOCATION
@task(log_prints=True, cache_key_fn=static_cache_key)
def majo_test():
print("this is executing and should completed successfully")
return Completed(message="task is completed")
@cicdw We have already tried using static result path @majo-aqfer has mentioned in Temporary Solution section. For your confirmation again , I have tried it again. The Failed task (majo_2 ) has to be re-executed but its result is cached hence it will proceed again.
This behavior prevents the failed task from running again, potentially leading to incomplete workflows, as downstream tasks may proceed without addressing the initial failure.
@cicdw Issue is badly affecting the migration to Prefect3 in Production , We are expecting a quicker response and resolution.
@Hafeez-Aqfer you should not use result_storage_key for your use case right now, as it currently stores failure states and instead should use a cache_key_fn (copying the same code from above just for reference):
def static_cache_key(context, parameters):
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+str(context.task_run.flow_run_id)+"/"+str(context.task_run.task.name)+".json"
return LOCATION
@task(log_prints=True, cache_key_fn=static_cache_key)
def majo_test():
print("this is executing and should completed successfully")
return Completed(message="task is completed")
We have noted the enhancement request that result_storage_key should work as you expect and not store failure modes and will look into a solution for that soon but in the meantime using cache_key_fn as outlined above should unblock you.
@cicdw As mentioned earlier, we have been using cache_key_fn, but it behaves the same way.
Below is the code we've already shared and mentioned that the result for a failed task is being cached, and it's not re-executed upon retry. The result shows cached but the failed task is skipped from execution.
Could you please check this in your environment for a quicker resolution?
from prefect import flow, task
from prefect import runtime
from prefect_aws.s3 import S3Bucket, AwsCredentials
from prefect_github.repository import GitHubRepository
from prefect.client.schemas.schedules import CronSchedule
from prefect.states import Failed, Completed
from prefect.cache_policies import TASK_SOURCE, INPUTS
import time
CID = "c008"
FLOW_NAME ="sample_v1"
SANDBOX = "c008"
CLUSTER = "aqfer-preprod-eks"
S3_BUCKET = S3Bucket.load("sample-s3", _sync=True)
def static_cache_key(context, parameters):
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+str(context.task_run.flow_run_id)+"/"+str(context.task_run.name)+".json"
return LOCATION
@task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key)
def majo_test():
print("this is executing and should completed successfully")
return Completed(message="task is completed")
@task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key)
def sample_failure_v1():
if runtime.flow_run.run_count > 1:
print("Successful execution and should complete")
return Completed(message="task is completed")
print("this is executing and should Fail")
time.sleep(10)
return Failed(message="task is failed")
@task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key)
def sample_successfull_v2(param):
print("value : {}".format(param["par"]))
return Completed(message="task is completed")
@flow(log_prints=True, persist_result=True, validate_parameters=False, result_storage=S3_BUCKET)
def sample_v1(prev: str = None):
p = [{"par": "first"},{"par": "second"}]
f = sample_successfull_v1.with_options(task_run_name="sample_successfull_v1")(return_state=True)
print(f)
g = sample_failure_v1.with_options(task_run_name="sample_failure_v1").submit(wait_for=[f], return_state=True)
print(g)
h = sample_successfull_v2.with_options(task_run_name="sample_successfull_v2-[{param[par]}]").map(param=p, wait_for=[g], return_state=True)
print(h)
if f.is_failed() or g.is_failed() or any(state.is_failed() for state in h):
return Failed()
if g.is_completed():
return Completed()
else:
return Failed()
if __name__ == "__main__":
flow.from_source(
source= GitHubRepository.load("github-block"),
entrypoint = "c008/migration/sample_v1.py:sample_v1"
).deploy(
name = CID,
work_pool_name = "new-prefect-worker",
work_queue_name = CID,
paused=True,
schedules=[(CronSchedule(cron="*/5 * * * *"))],
tags = [CID,SANDBOX],
version = "1",
)
Hi @cicdw,
Have there been any updates or findings on this issue from the Prefect team's side? We’re eager to know when we can expect it to be addressed, as it is currently blocking us from migrating our production environment from Prefect 2 to Prefect 3. Given the critical nature of this migration, we would greatly appreciate it if this could be resolved as soon as possible.
Looking forward to your response.
@majo-aqfer if you use a static cache key function (meaning it should depend on values that do not change across task invocations) you will get your expected result; here is an example of a static cache key function you can use:
def static_cache_key(context, parameters):
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+str(context.task_run.flow_run_id)+"/"+str(context.task_run.task.name)+".json"
return LOCATION
Note that I am using the task name (which is static), not the task run name (which is dynamic per invocation).
@majo-aqfer if you use a static cache key function (meaning it should depend on values that do not change across task invocations) you will get your expected result; here is an example of a static cache key function you can use:
def static_cache_key(context, parameters): LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+str(context.task_run.flow_run_id)+"/"+str(context.task_run.task.name)+".json" return LOCATIONNote that I am using the task name (which is static), not the task run name (which is dynamic per invocation).
@cicdw We have already test this scenerio as well, as mentioned several times above even on using static cache key . Instead, it is incorrectly marked as cached (type = complete). This behavior prevents the failed task from running again, potentially leading to incomplete workflows, as downstream tasks may proceed without addressing the initial failure.
We would like you to look at the snapshots shared and its not the issue with using of result_storage_key or static cache key. If possible please arrange for a demo session to address the issue as priority.
If you see in the below snapshot , majo_2 task is failed, on retrying the task didnt re-ran as its a failed task wherein it just cached the result hence it bypassed the execution.
@Hafeez-Aqfer none of your shared code examples have used a static cache key which is why I keep emphasizing it.
@cicdw As mentioned earlier, we have been using
cache_key_fn, but it behaves the same way.Below is the code we've already shared and mentioned that the result for a failed task is being cached, and it's not re-executed upon retry. The result shows cached but the failed task is skipped from execution.
Could you please check this in your environment for a quicker resolution?
from prefect import flow, task from prefect import runtime from prefect_aws.s3 import S3Bucket, AwsCredentials from prefect_github.repository import GitHubRepository from prefect.client.schemas.schedules import CronSchedule from prefect.states import Failed, Completed from prefect.cache_policies import TASK_SOURCE, INPUTS import time CID = "c008" FLOW_NAME ="sample_v1" SANDBOX = "c008" CLUSTER = "aqfer-preprod-eks" S3_BUCKET = S3Bucket.load("sample-s3", _sync=True) def static_cache_key(context, parameters): LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+str(context.task_run.flow_run_id)+"/"+str(context.task_run.name)+".json" return LOCATION @task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key) def majo_test(): print("this is executing and should completed successfully") return Completed(message="task is completed") @task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key) def sample_failure_v1(): if runtime.flow_run.run_count > 1: print("Successful execution and should complete") return Completed(message="task is completed") print("this is executing and should Fail") time.sleep(10) return Failed(message="task is failed") @task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key) def sample_successfull_v2(param): print("value : {}".format(param["par"])) return Completed(message="task is completed") @flow(log_prints=True, persist_result=True, validate_parameters=False, result_storage=S3_BUCKET) def sample_v1(prev: str = None): p = [{"par": "first"},{"par": "second"}] f = sample_successfull_v1.with_options(task_run_name="sample_successfull_v1")(return_state=True) print(f) g = sample_failure_v1.with_options(task_run_name="sample_failure_v1").submit(wait_for=[f], return_state=True) print(g) h = sample_successfull_v2.with_options(task_run_name="sample_successfull_v2-[{param[par]}]").map(param=p, wait_for=[g], return_state=True) print(h) if f.is_failed() or g.is_failed() or any(state.is_failed() for state in h): return Failed() if g.is_completed(): return Completed() else: return Failed() if __name__ == "__main__": flow.from_source( source= GitHubRepository.load("github-block"), entrypoint = "c008/migration/sample_v1.py:sample_v1" ).deploy( name = CID, work_pool_name = "new-prefect-worker", work_queue_name = CID, paused=True, schedules=[(CronSchedule(cron="*/5 * * * *"))], tags = [CID,SANDBOX], version = "1", )
@cicdw I think you have missed seeing this sample code which we have used **static_cache_key** and also we have shared the snapshot of task re-run showing the result is cached, without re-processing the Failed task again. @masonmenges @mthatt Have checked the issue as well.
@Hafeez-Aqfer that sample code does not have a static cache key; it is using the task_run.name field which changes per invocation as I said in my previous comment.
@cicdw We have utilized a static cache key, as noted in the earlier comments, where result.json is specified as the static cache key.
We've also attempted to use context.task_run.task.name, which has resulted in an error indicating that the key task is not present in task_run. Below is the code we used, where task.name is intended as a static value. However, the issue remains unresolved despite testing with Prefect version 3.1.1.
def static_cache_key(context, parameters):
print(context)
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+str(context.task_run.flow_run_id)+"/"+str(context.task.name)+".json"
return LOCATION
The issue persists where Failed tasks are being cached, and upon retry, these Failed tasks transition to a Cached (Completed) state without reprocessing. We have attempted to explain this issue from the beginning, and if it remains unclear, we would be happy to connect at a convenient time for further clarification.
Attached below snapshot:
Error snapshot for context.task_run.task.name
Task majo_2 is cached(Completed) whereas it should re-execute and go to Completed task
from prefect import flow, task
from prefect import runtime
from prefect_aws.s3 import S3Bucket, AwsCredentials
from prefect_github.repository import GitHubRepository
from prefect.client.schemas.schedules import CronSchedule
from prefect.states import Failed, Completed
from prefect.cache_policies import TASK_SOURCE, INPUTS
import time
CID = "c008"
FLOW_NAME ="sample_v1"
SANDBOX = "c008"
CLUSTER = "aqfer-preprod-eks"
S3_BUCKET = S3Bucket.load("sample-s3", _sync=True)
def static_cache_key(context, parameters):
LOCATION = "results-v2/"+CID+"/"+FLOW_NAME+"/"+str(context.task_run.flow_run_id)+"/"+str(context.task.name)+".json"
return LOCATION
@task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key)
def majo_test():
print("this is executing and should completed successfully")
return Completed(message="task is completed")
@task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key)
def sample_failure_v1():
if runtime.flow_run.run_count > 1:
print("Reran the task which is completed successfully")
time.sleep(5)
return Completed(message="task is completed")
print("this is executing and should Fail")
time.sleep(10)
return Failed(message="task is failed")
@task(log_prints=True,persist_result=True,cache_key_fn=static_cache_key)
def sample_successfull_v2(param):
print("value : {}".format(param["par"]))
return Completed(message="task is completed")
@flow(log_prints=True, persist_result=True, validate_parameters=False, result_storage=S3_BUCKET)
def sample_v1(prev: str = None):
p = [{"par": "first"},{"par": "second"}]
f = sample_successfull_v1.with_options(task_run_name="sample_successfull_v1")(return_state=True)
print(f)
g = sample_failure_v1.with_options(task_run_name="sample_failure_v1").submit(wait_for=[f], return_state=True)
print(g)
h = sample_successfull_v2.with_options(task_run_name="sample_successfull_v2-[{param[par]}]").map(param=p, wait_for=[g], return_state=True)
print(h)
if f.is_failed() or g.is_failed() or any(state.is_failed() for state in h):
return Failed()
if g.is_completed():
return Completed()
else:
return Failed()
if __name__ == "__main__":
flow.from_source(
source= GitHubRepository.load("github-block"),
entrypoint = "c008/migration/sample_v1.py:sample_v1"
).deploy(
name = CID,
work_pool_name = "new-prefect-worker",
work_queue_name = CID,
paused=True,
schedules=[(CronSchedule(cron="*/5 * * * *"))],
tags = [CID,SANDBOX],
version = "1",
)
@Hafeez-Aqfer we are talking past each other. If you upgrade to 3.1.2. the original problem will go away regardless of the cache key or result storage key that you use.