`dag.test(use_executor=True)` doesn't provide the `/execution` API, but all executors rely on it
Apache Airflow version
main (development)
If "Other Airflow 2/3 version" selected, which one?
No response
What happened?
All executors, even as simple as the LocalExecutor try to send API requests to /execution. I'm not exactly sure which component typically provides this, maybe the API server. However, dag.test(use_executor=True) doesn't run the component that does, so you end up with a lot of '[Errno 61] Connection refused' from trying to call <Request('PATCH', 'http://localhost:8789/execution/task-instances/019aebfb-c04b-77bd-bf3c-4feb705d81ea/run')>.
As a result, I'm not able to run dag.test(use_executor=True). I've had no luck starting an API server separately either. More specifically, if you start the API server before you run dag.test(), the above error goes away, but you instead get the following:
[2025-12-05T01:03:18.586481Z] {taskinstance.py:1543} ERROR - Executor LocalExecutor(parallelism=4) reported that the task instance <TaskInstance: example.extract manual__2025-12-05T01:03:08.304291+00:00 [queued]> finished with state failed, but the task instance's state attribute is queued. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
I think running a real API server for a test is overkill anyway.
What you think should happen instead?
No response
How to reproduce
Try to run any workflow with dag.test(use_executor=True)
Operating System
macOS 15.6
Versions of Apache Airflow Providers
No response
Deployment
Virtualenv installation
Deployment details
No response
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
I think (@amoghrajesh @ashb ?) - that's part of the task-isolation work. I am not even sure if we want to test the dags with use-executor - because indeed that would mean some test api_server running locally and have the supervisor talk to it. Maybe it could be in-memory api and no real HTTP server?
I think the main use case here is for running system tests with custom executors like here: https://github.com/apache/airflow/pull/48699
It is the best way to test a combination entirely and that's probably why we have it. Maybe a warning or a pre check that api server is running would be good
The reason why I want to use an executor in my tests is because I'm relying on the task retry mechanism which I believe is implemented by the scheduler, so dag.test(use_executor=False) won't correctly perform retries.
The reason why I want to use an executor in my tests is because I'm relying on the task retry mechanism which I believe is implemented by the scheduler, so
dag.test(use_executor=False)won't correctly perform retries.
Maybe a good idea to implement retries with use_executor = False ?
And then probably we can remove "use_executor" entirely ?
I'm experiencing the same issue, as I also heavily rely on dag.test(use_executor=True) (via Airflow System Tests).
I'm using the Airflow Pytest Plugin (which invokes dag.test(executor=True)) on pipelines testing the custom executor implemented in the Nomad Airflow Provider project. No integration with breeze, but simple pytest calls. I run the tests against Airflow API server started up "manually" before the test run. (*)
Investigating the issue lately my impression was a race condition across the following events:
LocalExecutorexecutes workloads via the supervisor- at the end of the task execution, the
supervisoris sending an call to the Airflow API/execution/task-instances/endpoint. This call is responsible for updating the database with the final state - then the
supervisorreturns results to the Executor, who will register the results in its Event Buffer - NOTE: 1.)-3.) were part of the Executor's
heartbeat()/sync()calls. - in the particularly intense
dag.test()polling workflow theSchedulerJobRunner.process_executor_events()call is invoked right after, rapidly processing contents of the Executor's Event Buffer. - however, at this point the API call from 2.) is not yet finished. The
supervisor's attempt to record the state in the database is not yet done.
This results in an inconsistent state between the Executor's Event Buffer and the database, reflected by the errors such as:
Executior (dag.test(use_executor=True) side:
2025-12-09 15:05:04 [info ] Task finished [supervisor] duration=10.164723910973407 exit_code=0 final_state=success
[2025-12-09T15:05:05.738+0100] {scheduler_job_runner.py:835} INFO - Received executor event with state success for task instance TaskInstanceKey(dag_id='test-nomad-job-operator-localexecutor', task_id='nomad_job_localexecutor', run_id='manual__2025-12-09T14:04:54.434758+00:00', try_number=1, map_index=-1)
[2025-12-09T15:05:05.746+0100] {scheduler_job_runner.py:879} INFO - TaskInstance Finished: dag_id=test-nomad-job-operator-localexecutor, task_id=nomad_job_localexecutor, run_id=manual__2025-12-09T14:04:54.434758+00:00, map_index=-1, run_start_date=2025-12-09 14:04:54.605744+00:00, run_end_date=None, run_duration=None, state=running, executor=LocalExecutor(parallelism=32), executor_state=success, try_number=1, max_tries=0, pool=default_pool, queue=default, priority_weight=1, operator=NomadJobOperator, queued_dttm=None, scheduled_dttm=2025-12-09 14:04:54.449045+00:00,queued_by_job_id=None, pid=252308
[2025-12-09T15:05:05.753+0100] {scheduler_job_runner.py:952} ERROR - DAG 'test-nomad-job-operator-localexecutor' for task instance <TaskInstance: test-nomad-job-operator-localexecutor.nomad_job_localexecutor manual__2025-12-09T14:04:54.434758+00:00 [running]> not found in serialized_dag table
[2025-12-09T15:05:05.753+0100] {taskinstance.py:1888} ERROR - Executor LocalExecutor(parallelism=32) reported that the task instance <TaskInstance: test-nomad-job-operator-localexecutor.nomad_job_localexecutor manual__2025-12-09T14:04:54.434758+00:00 [running]> finished with state success, but the task instance's state attribute is running. Learn more: https://airflow.apache.org/docs/apache-airflow/stable/troubleshooting.html#task-state-changed-externally
[2025-12-09T15:05:05.758+0100] {taskinstance.py:2011} INFO - Marking task as FAILED. dag_id=test-nomad-job-operator-localexecutor, task_id=nomad_job_localexecutor, run_id=manual__2025-12-09T14:04:54.434758+00:00, logical_date=20251209T140454, start_date=20251209T140454, end_date=20251209T140505
API server side (all good):
INFO: 127.0.0.1:54378 - "PUT /execution/task-instances/019b036d-ed2d-7948-a320-7ff515217e41/heartbeat HTTP/1.1" 204 No Content
INFO: 127.0.0.1:54378 - "POST /execution/xcoms/test-nomad-job-operator-localexecutor/manual__2025-12-09T14%3A04%3A54.434758%2B00%3A00/nomad_job_localexecutor/return_value HTTP/1.1" 201 Created
2025-12-09 15:05:04 [debug ] Updating task instance state new_state=success ti_id=019b036d-ed2d-7948-a320-7ff515217e41
2025-12-09 15:05:04 [debug ] Retrieved current task instance state max_tries=0 previous_state=running ti_id=019b036d-ed2d-7948-a320-7ff515217e41 try_number=1
2025-12-09 15:05:04 [info ] Task instance state updated new_state=success rows_affected=1 ti_id=019b036d-ed2d-7948-a320-7ff515217e41
INFO: 127.0.0.1:54378 - "PATCH /execution/task-instances/019b036d-ed2d-7948-a320-7ff515217e41/state HTTP/1.1" 204 No Content
The errros above were from Airflow 3.0.6, but I've been observing similar behavior on 3.1.3.
Of course I may be mistaken. I'm glad to get guidelines on any part of the workflow that may have been misunderstood.
I'm happy to offer a PR or to contribute to ongoing efforts.
(*) NOTE: Programming errors on the LocalExecutor pipeline have been recently identified, fix coming very soon.
Nice deep dive. Sounds plausible. A proof of concept quick fix/PR would be great :)
My analysis was almost correct 😅 The reason was very different, though the sympthomes are highly similar.
It was not a slow DB update vs. rapid Event Buffer processing. Instead it was cached information on the dag's DB session object.
The issue is reproduced with extended logging: https://github.com/juditnovak/airflow/pull/2
The proposed fix is demonstrated on a separate so far only demonstrative) draft: (https://github.com/juditnovak/airflow/pull/1).
If we are to keep the use_executor flag, then what are your thoughts on handling the API server? Should it start automatically with dag.test(), or should the user be required to start one themselves?
I'm personally in favor of starting it by myself.
- While it may be a small additional overhead, it also leaves more flexibility on the developer's side
- custom config, etc.
- Reduces extra complexity on
dag.test()- potential future bugs, etc.
- Reduces further divergence on the
dag.test()logic depending when an executor is used or not - Currently
dag.test()maps very well to the main scheduler execution loop (airflow.jobs.scheduler_job_runner.SchedulerJobRunner._run_scheduler_loop()). This has multiple advantages, and I personally believe that it's very good to keep it as an objective:- ensures that the testing workflow is "as similar" to the real execution, as possible
- reducing/avoiding false positives, inconsistency, etc.
- makes it easy to debug (real execution vs. test pipelines)
- ensures that the testing workflow is "as similar" to the real execution, as possible
I'm efficiently using the current arrangement for automated tests. I find the Airflow System Tests to be a great tool, handy and powerful. Airflow is highly modular and extendible, and the test framework supports that really well, allowing for "real life" executions targeting multiple extendible core Airflow concepts.
-
Testing a custom executor (in my case: executing Airflow workloads over Hasicorp's Nomad workload manager):
- Corresponnding pipeline script
- where the
toxcall corresponds to thispytestcall, using the Airflow System Tests plugin The tests above involve both Airflow standard operators/decorators, and custom ones.
-
Testing custom operator/decorators (in my case, Airflow tasks on Hasicorp's Nomad workload manager) against
LocalExecutor. (pipeline, tox reference) NOTE: Bug detected on this pipeline, fix is on the way.
Indeed, I think it would be great to just document the different scenarios how dags.test could be used. This would be a fantastic contribution to explain some of the tests scenarios and use cases, how to run them with some examples and step-by-step (simple) guidelines.
And if it comes from someone who actually finds it useful and can describe it in the way easy to grasp by similar people - that's doubly powerful
I'm glad to volunteer to extend Airflow System Test and dag.test() documentation, in case it's an effort that may be welcome.
(While the basics were very well covered, I recall that I missed a few details from the docs as I was discovering the testsuite.) I'd be happy to cover the gap with a more details, examples and use cases, if this could be helpful.
I'm glad to volunteer to extend Airflow System Test and
dag.test()documentation, in case it's an effort that may be welcome.(While the basics were very well covered, I recall that I missed a few details from the docs as I was discovering the testsuite.) I'd be happy to cover the gap with a more details, examples and use cases, if this could be helpful.
Feeln absolutely free! That is one of the best contributions you can make!
I added a pretty raw draft: https://github.com/apache/airflow/pull/59390
It's rather to give an idea, clearly not "production-ready". Please let me know your feedback.