airflow
airflow copied to clipboard
Add `retry_from_failure` parameter to DbtCloudRunJobOperator
This PR adds a new retry_from_failure
parameter to the DbtRunJobOperator
to retry a failed run of a dbt Cloud job from the point of failure. The implementation uses the new rerun endpoint in the dbt API
which handles the lookup of the last run for a given job itself and decides whether to start a new run of the job or not.
New endpoint is only used when retry_from_failure
is True and try_number
of the task is greater than 1. It also cannot be used in conjunction with steps_override
, schema_override
and additional_run_config
.
Closes: #35772 See also: #38001
Congratulations on your first Pull Request and welcome to the Apache Airflow community! If you have any issues or are unsure about any anything please check our Contributors' Guide (https://github.com/apache/airflow/blob/main/contributing-docs/README.rst) Here are some useful points:
- Pay attention to the quality of your code (ruff, mypy and type annotations). Our pre-commits will help you with that.
- In case of a new feature add useful documentation (in docstrings or in
docs/
directory). Adding a new operator? Check this short guide Consider adding an example DAG that shows how users should use it. - Consider using Breeze environment for testing locally, it's a heavy docker but it ships with a working Airflow and a lot of integrations.
- Be patient and persistent. It might take some time to get a review or get the final approval from Committers.
- Please follow ASF Code of Conduct for all communication including (but not limited to) comments on Pull Requests, Mailing list and Slack.
- Be sure to read the Airflow Coding style.
- Always keep your Pull Requests rebased, otherwise your build might fail due to changes not related to your commits. Apache Airflow is a community-driven project and together we are making it better 🚀. In case of doubts contact the developers at: Mailing List: [email protected] Slack: https://s.apache.org/airflow-slack
Hey @josh-fell, I've created this PR based on your comments here. I would really appreciate it if you could take a look at it!
Hi @josh-fell, did you have a chance to look at this PR? Would appreciate your comments!
rerun endpoint does not accept body, which means parameters like steps_override, schema_override, threads_override, cause cannot be passed. Current implementation always uses rerun endpoint if retry_from_failure is set to True. To overcome this issue, rerun endpoint can be used only if the task is retried (i.e. ti.try_number !=1).
Maybe we can check whether steps_override, schema_override, threads_override
are provided with retry_from_failure
and raise an error if so. Also, did you implement the ti.try_number part already?
rerun endpoint does not accept body, which means parameters like steps_override, schema_override, threads_override, cause cannot be passed. Current implementation always uses rerun endpoint if retry_from_failure is set to True. To overcome this issue, rerun endpoint can be used only if the task is retried (i.e. ti.try_number !=1).
Maybe we can check whether
steps_override, schema_override, threads_override
are provided withretry_from_failure
and raise an error if so. Also, did you implement the ti.try_number part already?
rerun endpoint does not accept body, which means parameters like steps_override, schema_override, threads_override, cause cannot be passed. Current implementation always uses rerun endpoint if retry_from_failure is set to True. To overcome this issue, rerun endpoint can be used only if the task is retried (i.e. ti.try_number !=1).
Maybe we can check whether
steps_override, schema_override, threads_override
are provided withretry_from_failure
and raise an error if so. Also, did you implement the ti.try_number part already?
Hi @Lee-W,
I have also implemented the try_number
part, could you please take a look at it as well?
And also, if steps_override, schema_override, threads_override
are provided with retry_from_failure
, should it be a warning or an error? Displaying a warning and discarding the values of overrides might also be an option.
I have also implemented the try_number part, could you please take a look at it as well?
Yep, just found it
And also, if steps_override, schema_override, threads_override are provided with retry_from_failure, should it be a warning or an error? Displaying a warning and discarding the values of overrides might also be an option.
I feel like an error might makes more sense 🤔 I don't personally use dbt that much, but I guess steps_override, schema_override, threads_override
could significantly change the behavior somehow. If that's the case, it might be better if we raise an error. But please correct me if I'm wrong 🙂 Thanks!
I feel like an error might makes more sense 🤔 I don't personally use dbt that much, but I guess
steps_override, schema_override, threads_override
could significantly change the behavior somehow. If that's the case, it might be better if we raise an error. But please correct me if I'm wrong 🙂 Thanks!
Yes, those parameters change the behavior significantly. My only concern is with try_number > 1
check, these parameters can actually work in the first run, i.e. try_number = 1. We can either;
-
Do not allow users to use
steps_override, schema_override, additional_run_config
whenrerun_from_failure
set toTrue
. (Raise an error) -
Keep it as it is and only show a warning when
try_number > 1
. Because in this case, in the first run, users will be able to use those overrides, and then thererun
would also do the same on thedbt cloud
side by just rerunning the previous run as explained in the docs.
For me, it feels like second approach is more suitable as we do not limit the users, but it all depends on the try_number
and can make it more complicated to understand.
Let me know what you think :)
@boraberke Just want to confirm the second point you mentioned. Because the first run already uses steps_override, schema_override, additional_run_config
, when we rerun, it'll use the configuration from the last run which contains steps_override, schema_override, additional_run_config
. If that the case, I would say method 2 is better
@Lee-W, I will double-check if rerun uses the overrides, i.e. runs exactly the way the first job run, then add the warning or error accordingly!
Thanks for your comments!
@Lee-W, I will double-check if rerun uses the overrides, i.e. runs exactly the way the first job run, then add the warning or error accordingly!
Thanks for your comments!
Many thanks! No urgent. Just change the state of this PR so that everyone would have a better understand on the status 🙂
@Lee-W,
I have tested and here how it works:
When first run already uses steps_override, schema_override, additional_run_config
and the first run is failed, rerunning it will use the same config including steps_override, schema_override, additional_run_config
.
However, if the first run with steps_override, schema_override, additional_run_config
is finished succesfully, rerunning it does not include any of those. Thus, it does not re-uses the config of the previous run.
This means in any case where the operator had failed but dbt job is successful, when try_number > 1, it would run without the overrides
.
I think it is best to raise an error if any of steps_override, schema_override, additional_run_config
provided when retry_from_failure
is True. I have updated this PR accordingly.
Let me know what you think!
I think it is best to raise an error if any of
steps_override, schema_override, additional_run_config
provided whenretry_from_failure
is True. I have updated this PR accordingly.
Indeed, I think this is what we should do. Thanks for the testing and update!
@Lee-W, thanks for the review! Fixed upon your latest comments as well :)
Hi @josh-fell, thank you for your feedback!
I'm trying to understand the what happens if a user sets
retry_from_failure=True
on the operator and provides eithersteps_override
,schema_override
, oradditional_run_config
initially and the task is naturally retried in Airflow. It seems like with the most recent changes, the task would fail because those args were supplied originally onceretry_from_failure()
is called in the DbtCloudHook. Can you clarify that for me?
Yes, it is correct.
Maybe to alleviate both scenarios, when
retry_from_failure=True
, thetrigger_job_run()
method actually retrieves the job's status from dbt Cloud, assesses whether or not to call the retry endpoint based on success/failure? This would completely remove using Airflow internals to control how the job triggering behaves.
I agree that an additional check of the previously run jobs and deciding upon the state of the latest job would make it better.
In that scenario, there will be 3 cases:
- There are no previous dbt runs: use
run
- Previous dbt run was failed: use
rerun
- Previous dbt run was successful: use
run
I will also change the error to a warning, and whenever rerun
is being used, any of the overrides will not be taken into account. In any other cases, overrides can work as expected.
I will make the necessary changes and let you know!
Hi, thanks for implementing this new feature. Do you have an estimate on when this will be released?
Hi, thanks for implementing this new feature. Do you have an estimate on when this will be released?
Hi @hec10r, it has been released recently. Check the version 3.9.0.