airflow icon indicating copy to clipboard operation
airflow copied to clipboard

`dag.test(use_executor=True)` doesn't provide the `/execution` API, but all executors rely on it

Open multimeric opened this issue 3 weeks ago • 13 comments

Apache Airflow version

main (development)

If "Other Airflow 2/3 version" selected, which one?

No response

What happened?

All executors, even as simple as the LocalExecutor try to send API requests to /execution. I'm not exactly sure which component typically provides this, maybe the API server. However, dag.test(use_executor=True) doesn't run the component that does, so you end up with a lot of '[Errno 61] Connection refused' from trying to call <Request('PATCH', 'http://localhost:8789/execution/task-instances/019aebfb-c04b-77bd-bf3c-4feb705d81ea/run')>.

As a result, I'm not able to run dag.test(use_executor=True). I've had no luck starting an API server separately either. More specifically, if you start the API server before you run dag.test(), the above error goes away, but you instead get the following:

[2025-12-05T01:03:18.586481Z] {taskinstance.py:1543} ERROR - Executor LocalExecutor(parallelism=4) reported that the task instance <TaskInstance: example.extract manual__2025-12-05T01:03:08.304291+00:00 [queued]> finished with state failed, but the task instance's state attribute is queued. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally

I think running a real API server for a test is overkill anyway.

What you think should happen instead?

No response

How to reproduce

Try to run any workflow with dag.test(use_executor=True)

Operating System

macOS 15.6

Versions of Apache Airflow Providers

No response

Deployment

Virtualenv installation

Deployment details

No response

Anything else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

multimeric avatar Dec 05 '25 01:12 multimeric

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Dec 05 '25 01:12 boring-cyborg[bot]

I think (@amoghrajesh @ashb ?) - that's part of the task-isolation work. I am not even sure if we want to test the dags with use-executor - because indeed that would mean some test api_server running locally and have the supervisor talk to it. Maybe it could be in-memory api and no real HTTP server?

potiuk avatar Dec 07 '25 16:12 potiuk

I think the main use case here is for running system tests with custom executors like here: https://github.com/apache/airflow/pull/48699

It is the best way to test a combination entirely and that's probably why we have it. Maybe a warning or a pre check that api server is running would be good

amoghrajesh avatar Dec 08 '25 07:12 amoghrajesh

The reason why I want to use an executor in my tests is because I'm relying on the task retry mechanism which I believe is implemented by the scheduler, so dag.test(use_executor=False) won't correctly perform retries.

multimeric avatar Dec 08 '25 07:12 multimeric

The reason why I want to use an executor in my tests is because I'm relying on the task retry mechanism which I believe is implemented by the scheduler, so dag.test(use_executor=False) won't correctly perform retries.

Maybe a good idea to implement retries with use_executor = False ?

potiuk avatar Dec 08 '25 23:12 potiuk

And then probably we can remove "use_executor" entirely ?

potiuk avatar Dec 08 '25 23:12 potiuk

I'm experiencing the same issue, as I also heavily rely on dag.test(use_executor=True) (via Airflow System Tests).

I'm using the Airflow Pytest Plugin (which invokes dag.test(executor=True)) on pipelines testing the custom executor implemented in the Nomad Airflow Provider project. No integration with breeze, but simple pytest calls. I run the tests against Airflow API server started up "manually" before the test run. (*)

Investigating the issue lately my impression was a race condition across the following events:

  1. LocalExecutor executes workloads via the supervisor
  2. at the end of the task execution, the supervisor is sending an call to the Airflow API /execution/task-instances/ endpoint. This call is responsible for updating the database with the final state
  3. then the supervisor returns results to the Executor, who will register the results in its Event Buffer
  4. NOTE: 1.)-3.) were part of the Executor's heartbeat()/sync() calls.
  5. in the particularly intense dag.test() polling workflow the SchedulerJobRunner.process_executor_events() call is invoked right after, rapidly processing contents of the Executor's Event Buffer.
  6. however, at this point the API call from 2.) is not yet finished. The supervisor's attempt to record the state in the database is not yet done.

This results in an inconsistent state between the Executor's Event Buffer and the database, reflected by the errors such as:

Executior (dag.test(use_executor=True) side:

 2025-12-09 15:05:04 [info     ] Task finished                  [supervisor] duration=10.164723910973407 exit_code=0 final_state=success
[2025-12-09T15:05:05.738+0100] {scheduler_job_runner.py:835} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='test-nomad-job-operator-localexecutor', task_id='nomad_job_localexecutor', run_id='manual__2025-12-09T14:04:54.434758+00:00', try_number=1, map_index=-1)
[2025-12-09T15:05:05.746+0100] {scheduler_job_runner.py:879} INFO - TaskInstance Finished: dag_id=test-nomad-job-operator-localexecutor, task_id=nomad_job_localexecutor, run_id=manual__2025-12-09T14:04:54.434758+00:00, map_index=-1, run_start_date=2025-12-09 14:04:54.605744+00:00, run_end_date=None, run_duration=None, state=running, executor=LocalExecutor(parallelism=32), executor_state=success, try_number=1, max_tries=0, pool=default_pool, queue=default, priority_weight=1, operator=NomadJobOperator, queued_dttm=None, scheduled_dttm=2025-12-09 14:04:54.449045+00:00,queued_by_job_id=None, pid=252308
[2025-12-09T15:05:05.753+0100] {scheduler_job_runner.py:952} ERROR - DAG 'test-nomad-job-operator-localexecutor' for task instance <TaskInstance: test-nomad-job-operator-localexecutor.nomad_job_localexecutor manual__2025-12-09T14:04:54.434758+00:00 [running]> not found in serialized_dag table
[2025-12-09T15:05:05.753+0100] {taskinstance.py:1888} ERROR - Executor LocalExecutor(parallelism=32) reported that the task instance <TaskInstance: test-nomad-job-operator-localexecutor.nomad_job_localexecutor manual__2025-12-09T14:04:54.434758+00:00 [running]> finished with state success, but the task instance's state attribute is running. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
[2025-12-09T15:05:05.758+0100] {taskinstance.py:2011} INFO - Marking task as FAILED. dag_id=test-nomad-job-operator-localexecutor, task_id=nomad_job_localexecutor, run_id=manual__2025-12-09T14:04:54.434758+00:00, logical_date=20251209T140454, start_date=20251209T140454, end_date=20251209T140505      

API server side (all good):

INFO:     127.0.0.1:54378 - "PUT /execution/task-instances/019b036d-ed2d-7948-a320-7ff515217e41/heartbeat HTTP/1.1" 204 No Content
INFO:     127.0.0.1:54378 - "POST /execution/xcoms/test-nomad-job-operator-localexecutor/manual__2025-12-09T14%3A04%3A54.434758%2B00%3A00/nomad_job_localexecutor/return_value HTTP/1.1" 201 Created
2025-12-09 15:05:04 [debug    ] Updating task instance state   new_state=success ti_id=019b036d-ed2d-7948-a320-7ff515217e41
2025-12-09 15:05:04 [debug    ] Retrieved current task instance state max_tries=0 previous_state=running ti_id=019b036d-ed2d-7948-a320-7ff515217e41 try_number=1
2025-12-09 15:05:04 [info     ] Task instance state updated    new_state=success rows_affected=1 ti_id=019b036d-ed2d-7948-a320-7ff515217e41
INFO:     127.0.0.1:54378 - "PATCH /execution/task-instances/019b036d-ed2d-7948-a320-7ff515217e41/state HTTP/1.1" 204 No Content

The errros above were from Airflow 3.0.6, but I've been observing similar behavior on 3.1.3.

Of course I may be mistaken. I'm glad to get guidelines on any part of the workflow that may have been misunderstood.

I'm happy to offer a PR or to contribute to ongoing efforts.

(*) NOTE: Programming errors on the LocalExecutor pipeline have been recently identified, fix coming very soon.

juditnovak avatar Dec 09 '25 14:12 juditnovak

Nice deep dive. Sounds plausible. A proof of concept quick fix/PR would be great :)

potiuk avatar Dec 09 '25 14:12 potiuk

My analysis was almost correct 😅 The reason was very different, though the sympthomes are highly similar.

It was not a slow DB update vs. rapid Event Buffer processing. Instead it was cached information on the dag's DB session object.

The issue is reproduced with extended logging: https://github.com/juditnovak/airflow/pull/2

The proposed fix is demonstrated on a separate so far only demonstrative) draft: (https://github.com/juditnovak/airflow/pull/1).

juditnovak avatar Dec 10 '25 03:12 juditnovak

If we are to keep the use_executor flag, then what are your thoughts on handling the API server? Should it start automatically with dag.test(), or should the user be required to start one themselves?

multimeric avatar Dec 10 '25 03:12 multimeric

I'm personally in favor of starting it by myself.

  1. While it may be a small additional overhead, it also leaves more flexibility on the developer's side
    • custom config, etc.
  2. Reduces extra complexity on dag.test()
    • potential future bugs, etc.
  3. Reduces further divergence on the dag.test() logic depending when an executor is used or not
  4. Currently dag.test() maps very well to the main scheduler execution loop (airflow.jobs.scheduler_job_runner.SchedulerJobRunner._run_scheduler_loop()). This has multiple advantages, and I personally believe that it's very good to keep it as an objective:
    • ensures that the testing workflow is "as similar" to the real execution, as possible
      • reducing/avoiding false positives, inconsistency, etc.
    • makes it easy to debug (real execution vs. test pipelines)

I'm efficiently using the current arrangement for automated tests. I find the Airflow System Tests to be a great tool, handy and powerful. Airflow is highly modular and extendible, and the test framework supports that really well, allowing for "real life" executions targeting multiple extendible core Airflow concepts.

  1. Testing a custom executor (in my case: executing Airflow workloads over Hasicorp's Nomad workload manager):

    • Corresponnding pipeline script
    • where the tox call corresponds to this pytest call, using the Airflow System Tests plugin The tests above involve both Airflow standard operators/decorators, and custom ones.
  2. Testing custom operator/decorators (in my case, Airflow tasks on Hasicorp's Nomad workload manager) against LocalExecutor. (pipeline, tox reference) NOTE: Bug detected on this pipeline, fix is on the way.

juditnovak avatar Dec 10 '25 10:12 juditnovak

Indeed, I think it would be great to just document the different scenarios how dags.test could be used. This would be a fantastic contribution to explain some of the tests scenarios and use cases, how to run them with some examples and step-by-step (simple) guidelines.

potiuk avatar Dec 10 '25 18:12 potiuk

And if it comes from someone who actually finds it useful and can describe it in the way easy to grasp by similar people - that's doubly powerful

potiuk avatar Dec 10 '25 18:12 potiuk

I'm glad to volunteer to extend Airflow System Test and dag.test() documentation, in case it's an effort that may be welcome.

(While the basics were very well covered, I recall that I missed a few details from the docs as I was discovering the testsuite.) I'd be happy to cover the gap with a more details, examples and use cases, if this could be helpful.

juditnovak avatar Dec 11 '25 09:12 juditnovak

I'm glad to volunteer to extend Airflow System Test and dag.test() documentation, in case it's an effort that may be welcome.

(While the basics were very well covered, I recall that I missed a few details from the docs as I was discovering the testsuite.) I'd be happy to cover the gap with a more details, examples and use cases, if this could be helpful.

Feeln absolutely free! That is one of the best contributions you can make!

potiuk avatar Dec 11 '25 14:12 potiuk

I added a pretty raw draft: https://github.com/apache/airflow/pull/59390

It's rather to give an idea, clearly not "production-ready". Please let me know your feedback.

juditnovak avatar Dec 13 '25 16:12 juditnovak