oso
oso copied to clipboard
Change retry logic for jobs to change spot instance types depending on the retry count
What is it?
Currently, if the daily jobs attempt to retry they just keep attempting to use the same node types. However, because some of these jobs may be scheduled to spot instances, they might end up failing for some unknown reason. We should, instead, have them scheduled to run on progressively different instances. I asked dagster's GPT about it and this is what it said might be possible
Currently, there isn't a built-in way to dynamically update the op_tags for the k8s_job_executor based on the number of retries directly within Dagster. However, you can achieve this by implementing a custom retry mechanism using sensors or hooks to monitor the number of retries and adjust the configuration accordingly.
Here's a high-level approach to achieve this:
Track the Number of Retries: Use a sensor or a hook to monitor the job runs and keep track of the number of retries.
Adjust Configuration Based on Retries: When the number of retries exceeds a certain threshold (e.g., 2 times), modify the op_tags to allocate more resources or change other configurations.
Re-execute the Job with Updated Configuration: Trigger a re-execution of the job with the updated op_tags. Below is a conceptual example using a sensor to monitor job failures and re-execute with updated op_tags:
from dagster import sensor, RunRequest, DagsterEventType, EventRecordsFilter, DagsterInstance, execute_job, ReexecutionOptions, reconstructable
# Define your job and ops with initial configuration
from dagster_k8s import k8s_job_executor
from dagster import job, op
@op(tags={"dagster-k8s/config": {"container_config": {"resources": {"requests": {"cpu": "200m", "memory": "32Mi"}}}}})
def my_op(context):
context.log.info("Running my_op")
@job(executor_def=k8s_job_executor)
def my_job():
my_op()
# Sensor to monitor job failures and re-execute with updated configuration
@sensor(job=my_job)
def retry_sensor(context):
instance = DagsterInstance.get()
run_failure_records = instance.get_event_records(
EventRecordsFilter(event_type=DagsterEventType.RUN_FAILURE),
ascending=False,
limit=1,
)
if not run_failure_records:
return
failed_run_id = run_failure_records[0].run_id
failed_run = instance.get_run_by_id(failed_run_id)
retry_count = int(failed_run.tags.get('dagster/retry_number', '0'))
if retry_count > 2:
# Update op_tags for more resources
updated_op_tags = {
"dagster-k8s/config": {
"container_config": {
"resources": {
"requests": {"cpu": "500m", "memory": "64Mi"},
}
},
}
}
# Re-execute the job with updated configuration
options = ReexecutionOptions.from_failure(failed_run_id, instance)
execute_job(
reconstructable(my_job),
instance=instance,
reexecution_options=options,
tags={"dagster/retry_number": str(retry_count + 1), **updated_op_tags}
)
else:
# Re-execute the job with the same configuration
options = ReexecutionOptions.from_failure(failed_run_id, instance)
execute_job(
reconstructable(my_job),
instance=instance,
reexecution_options=options,
tags={"dagster/retry_number": str(retry_count + 1)}
)