airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Enable Inference Execution / Synchronous DAG Execution

Open kaxil opened this issue 6 months ago • 8 comments

Body

This was previously discussed on dev mailing list here and part of the proposal in this doc.

User Story

As a developer building interactive, data-driven applications,
I need to trigger an Airflow DAG and receive a result value or failure status synchronously,
so I can embed Airflow DAGs in API endpoints, chat agents, or inference services without polling or custom glue code.

Background

Airflow 3.0.0 shipped support for concurrent non–data-interval DAG runs ((1) from the Synchronous DAG Execution proposal), enabling multiple users to trigger the same DAG independently with logical_date=None.

This unlocks foundational support for request/response workloads.
To make this pattern fully usable, we now need:

  • (2) A way to return a result value from a DAG (via a specific task).
  • (3) A way to propagate failure status early (before full DAG teardown completes).

Both are natural extensions of current capabilities and align with the proposal.

Goals

  • Allow DAG authors to mark a single task as the “result task”; its output is returned.
  • Provide a blocking API to wait for that result or failure.
  • Surface failure status early if a critical task fails, without waiting for DAG teardown.

Design Questions

  1. How to choose the result task?
    DAG author must mark one task; compile-time enforcement or allow passing it via API?.

  2. Should this be API-only or include DSL changes? For simplicity, we could expose only an API where the result task is inferred (e.g. final successful task or a named convention). But DSL affordance (@result_task or result.task or a dag level argument) improves clarity, type-checkability, and UI introspection.

  3. Return full result vs. reference?
    Probably whatever is in the DB table only to avoid deserialisation & memory size risk in the API-server

  4. UI changes?
    Optional: tag result task in Graph view; expose link to result.

Milestones

Phase 1 is the main piece, both Phase 2 and Phase 3 are optional and we should figure out if we need it.

Phase 1 (Airflow 3.1.0): API support for synchronous result retrieval

API options:

  • Add wait_for_completion=true query param to the existing trigger endpoint:
    POST /api/v2/dags/{dag_id}/dagRuns?wait_for_completion=true
  • Or introduce a separate endpoint to fetch results:
    GET /api/v2/dags/{dag_id}/dagRuns/{run_id}/result?timeout=5s

Tradeoffs:

  • Single-endpoint (POST with blocking) is simpler for clients
  • Split-endpoint (POST + GET) is more RESTful and better for long polling/retries

Phase 2 (Airflow 3.2.0 or later): DSL for defining result task (optional)

Options: Ways to specify the result task:

  • Decorator or helper:
    @result_task or @task.result
  • DAG-level method:
    dag.result_from("fetch_answer")
  • DAG-level argument:
    dag(result_task="fetch_answer")

Phase 3 (Airflow 3.2.0 or later): Early failure propagation (optional)

Return to the invoking API:

  • Success: As soon as the result task finishes, without waiting for teardown (if any).
  • Failure: As soon as a critical task fails (including upstream_failed), without waiting for full DAG completion.

kaxil avatar Jun 13 '25 21:06 kaxil

I wonder if the sync apis should just return once after execution finishes (simple), or use techniques such as long polling or websocket (flexible but may need additional setup at client side). Either should be pretty easy to implement in FastAPI.

uranusjr avatar Jun 18 '25 07:06 uranusjr

wdyt #51920

uranusjr avatar Jun 19 '25 08:06 uranusjr

@uranusjr Yeah, fine with that approach to get it out sooner to get user feedback. Might be worth looking at making it an "experimental" endpoint, wdyt?

kaxil avatar Jun 23 '25 12:06 kaxil

Sounds good to me. How do we mark an endpoint as experiemental? Just mention it in the spec?

uranusjr avatar Jun 24 '25 05:06 uranusjr

Yeah, maybe in summary and description.

@dag_run_router.get(
    "/watch",
    tags=["experimental"],
    summary="Experimental: Trigger & Watch DAG until completion",
    description="🚧 This is an experimental endpoint and may change or be removed without notice."
)
def dag_run_watch():
    ...

or

a separate prefix completely (more involved for sure)

@beta_router.get("/beta/dagRun/watch")
def dag_run_watch():
    ...

kaxil avatar Jun 24 '25 08:06 kaxil

Still more to come in Phase 2+ but we’re set for 3.1.

uranusjr avatar Jul 09 '25 06:07 uranusjr

Waiting on https://github.com/apache/airflow/issues/52141 to be completed

phanikumv avatar Nov 03 '25 07:11 phanikumv

@uranusjr is planning to start on this after completing the SDK work on DeadlineAlerts..

phanikumv avatar Dec 10 '25 06:12 phanikumv