Enable Inference Execution / Synchronous DAG Execution
Body
This was previously discussed on dev mailing list here and part of the proposal in this doc.
User Story
As a developer building interactive, data-driven applications,
I need to trigger an Airflow DAG and receive a result value or failure status synchronously,
so I can embed Airflow DAGs in API endpoints, chat agents, or inference services without polling or custom glue code.
Background
Airflow 3.0.0 shipped support for concurrent non–data-interval DAG runs ((1) from the Synchronous DAG Execution proposal), enabling multiple users to trigger the same DAG independently with logical_date=None.
This unlocks foundational support for request/response workloads.
To make this pattern fully usable, we now need:
- (2) A way to return a result value from a DAG (via a specific task).
- (3) A way to propagate failure status early (before full DAG teardown completes).
Both are natural extensions of current capabilities and align with the proposal.
Goals
- Allow DAG authors to mark a single task as the “result task”; its output is returned.
- Provide a blocking API to wait for that result or failure.
- Surface failure status early if a critical task fails, without waiting for DAG teardown.
Design Questions
-
How to choose the result task?
DAG author must mark one task; compile-time enforcement or allow passing it via API?. -
Should this be API-only or include DSL changes? For simplicity, we could expose only an API where the result task is inferred (e.g. final successful task or a named convention). But DSL affordance (
@result_taskorresult.taskor a dag level argument) improves clarity, type-checkability, and UI introspection. -
Return full result vs. reference?
Probably whatever is in the DB table only to avoid deserialisation & memory size risk in the API-server -
UI changes?
Optional: tag result task in Graph view; expose link to result.
Milestones
Phase 1 is the main piece, both Phase 2 and Phase 3 are optional and we should figure out if we need it.
Phase 1 (Airflow 3.1.0): API support for synchronous result retrieval
API options:
- Add
wait_for_completion=truequery param to the existing trigger endpoint:
POST /api/v2/dags/{dag_id}/dagRuns?wait_for_completion=true - Or introduce a separate endpoint to fetch results:
GET /api/v2/dags/{dag_id}/dagRuns/{run_id}/result?timeout=5s
Tradeoffs:
- Single-endpoint (
POSTwith blocking) is simpler for clients - Split-endpoint (
POST+GET) is more RESTful and better for long polling/retries
Phase 2 (Airflow 3.2.0 or later): DSL for defining result task (optional)
Options: Ways to specify the result task:
- Decorator or helper:
@result_taskor@task.result - DAG-level method:
dag.result_from("fetch_answer") - DAG-level argument:
dag(result_task="fetch_answer")
Phase 3 (Airflow 3.2.0 or later): Early failure propagation (optional)
Return to the invoking API:
- Success: As soon as the result task finishes, without waiting for teardown (if any).
- Failure: As soon as a critical task fails (including
upstream_failed), without waiting for full DAG completion.
I wonder if the sync apis should just return once after execution finishes (simple), or use techniques such as long polling or websocket (flexible but may need additional setup at client side). Either should be pretty easy to implement in FastAPI.
wdyt #51920
@uranusjr Yeah, fine with that approach to get it out sooner to get user feedback. Might be worth looking at making it an "experimental" endpoint, wdyt?
Sounds good to me. How do we mark an endpoint as experiemental? Just mention it in the spec?
Yeah, maybe in summary and description.
@dag_run_router.get(
"/watch",
tags=["experimental"],
summary="Experimental: Trigger & Watch DAG until completion",
description="🚧 This is an experimental endpoint and may change or be removed without notice."
)
def dag_run_watch():
...
or
a separate prefix completely (more involved for sure)
@beta_router.get("/beta/dagRun/watch")
def dag_run_watch():
...
Still more to come in Phase 2+ but we’re set for 3.1.
Waiting on https://github.com/apache/airflow/issues/52141 to be completed
@uranusjr is planning to start on this after completing the SDK work on DeadlineAlerts..