airflow
airflow copied to clipboard
feat: Add DbtCloudRetryJobOperator to retry failed dbt job
This PR adds a new operator to the dbt provider DbtCloudRetryJobOperator. This operator calls DBT cloud API for retrying a job from point of failure.
Note after making the retrying call, the behavior of this operator is very similar to the DbtCloudRunJobOperator to poll the job status, hence it references the same operator link DbtCloudRunJobOperatorLink and the openlineage code is also the same.
Also makes a minor fix for DbtCloudRunJobOperator and related docs.
Tested locally with example DBT dag
closes: https://github.com/apache/airflow/issues/35772
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
hey @josh-fell do you still plan to review this PR? If not, can you assign some other reviewers? Thanks.
cc @Taragolis @hussein-awala @eladkal anyone can help review? Thanks!
Why not create a flag on the DbtCloudRunJobOperator to retry from failure rather than create a new operator that reuses a lot of the same code?
@josh-fell thanks. I was actually going back and forth between these approaches. I thought separating is cleaner because the operator only does one thing, and the user doesn't need to think through both "retry" from airflow task perspective and "retry" by calling that DBT API to retry from point of failure.
Let me think more about this.
@josh-fell As I think about adding a flag for retry from failure. I can't wrap my head around the logic for retry in a deferrable operator. Mind share your thoughts on how this could work if the operator is deferrable?
@josh-fell As I think about adding a flag for retry from failure. I can't wrap my head around the logic for retry in a deferrable operator. Mind share your thoughts on how this could work if the operator is deferrable?
@andyguwc Thanks for your patience here with my responses!
How I think about this feature, from a UX perspective, is really handling which endpoint is used in the trigger_job_run method of the DbtCloudHook based on a DbtCloudRunJobOperator parameter. The key is the behavior change of triggering the job rather than handling what to do based on its run status which is what the deferrable functionality checks. Also, let Airflow continue with its retry functionality; nothing new needed in the provider I think.
From the documentation of this new endpoint, it seems as though it could be used for retries as well as regular job execution.
Use this endpoint to retry a failed run for a job from the point of failure, if the run failed. Otherwise trigger a new run. When this endpoint returns a successful response, a new run will be enqueued for the account.
What this suggests is the dbt API handling the lookup of the last run for a given job and choosing whether or not to start a new instance of the job or not, which is great! So, I would think the implementation in Airflow would be adding a retry_from_failure parameter (or similarly named as you wish of course) to the DbtRunJobOperator, proprogate that value down to the trigger_job_run method in the hook, and then choose which endpoint to use based on that value. This way users simply just need to set a value in an existing operator and that's it.
I could be misinterpreting the robustness of this new endpoint. If I am, the checking of the last job run status could be implemented in the hook and then carry on with the endpoint decision.
I hope that helps!
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 5 days if no further activity occurs. Thank you for your contributions.