Dag processor cannot run in daemon mode
Apache Airflow version
3.0.0
If "Other Airflow 2 version" selected, which one?
No response
What happened?
When starting dag-processor using command: airflow dag-processor --daemon, it will end with following error:
OSError: [Errno 22] Invalid argument
Logs:
2025-05-01 00:50:40,229 INFO - Starting the Dag Processor Job
2025-05-01 00:50:40,229 INFO - Processing files using up to 2 processes at a time
2025-05-01 00:50:40,230 INFO - Process each file at most once every 30 seconds
2025-05-01 00:50:40,231 INFO - DAG bundles loaded: dags-folder, example_dags
2025-05-01 00:50:40,235 INFO - DAG bundles loaded: dags-folder, example_dags
2025-05-01 00:50:40,235 INFO - Checking for new files in bundle dags-folder every 300 seconds
2025-05-01 00:50:40,236 INFO - Checking for new files in bundle example_dags every 300 seconds
2025-05-01 00:50:40,241 INFO - Refreshing bundle dags-folder
2025-05-01 00:50:40,246 INFO - Searching for files in dags-folder at /home/yzhsieh/airflow/dags
2025-05-01 00:50:40,246 INFO - Found 0 files for bundle dags-folder
2025-05-01 00:50:40,258 INFO - Refreshing bundle example_dags
2025-05-01 00:50:40,263 INFO - Searching for files in example_dags at /home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/example_dags
2025-05-01 00:50:40,273 INFO - Found 58 files for bundle example_dags
2025-05-01 00:50:40,296 ERROR - Exception when executing DagProcessorJob
Traceback (most recent call last):
File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
self.processor.run()
File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 262, in run
return self._run_parsing_loop()
File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop
self._start_new_processes()
File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes
processor = self._create_process(file)
File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process
return DagFileProcessorProcess.start(
File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/processor.py", line 245, in start
proc: Self = super().start(target=target, **kwargs)
File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start
proc._register_pipe_readers(
File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers
self.selector.register(
File "/usr/lib/python3.10/selectors.py", line 360, in register
self._selector.register(key.fd, poller_events)
OSError: [Errno 22] Invalid argument
What you think should happen instead?
No response
How to reproduce
- Create an python venv and install airflow via Pypi
- Use
airflow standaloneto setup a simple Airflow instance - Issue
airflow dag-processor --daemon, then we can find error inairflow-dag-processor.log
In step 3, if I use airflow dag-processor instead, then it will work without any error
Operating System
Ubuntu 22.04.5 LTS
Versions of Apache Airflow Providers
(Not install any extra provider)
Deployment
Virtualenv installation
Deployment details
Python 3.10.12
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
I also encountered the same problem, as shown in the picture
2025-05-01 20:49:51,117 INFO - Starting the Dag Processor Job 2025-05-01 20:49:51,117 INFO - Processing files using up to 2 processes at a time 2025-05-01 20:49:51,118 INFO - Process each file at most once every 30 seconds 2025-05-01 20:49:51,120 INFO - DAG bundles loaded: dags-folder 2025-05-01 20:49:51,124 INFO - DAG bundles loaded: dags-folder 2025-05-01 20:49:51,124 INFO - Checking for new files in bundle dags-folder every 300 seconds 2025-05-01 20:49:51,128 INFO - Refreshing bundle dags-folder 2025-05-01 20:49:51,130 INFO - Searching for files in dags-folder at /root/airflow/dags 2025-05-01 20:49:51,133 INFO - Found 15 files for bundle dags-folder 2025-05-01 20:49:51,152 ERROR - Exception when executing DagProcessorJob Traceback (most recent call last): File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute self.processor.run() File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 262, in run return self._run_parsing_loop() File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop self._start_new_processes() File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes processor = self._create_process(file) File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process return DagFileProcessorProcess.start( File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/processor.py", line 245, in start proc: Self = super().start(target=target, **kwargs) File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start proc._register_pipe_readers( File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers self.selector.register( File "/root/miniconda3/envs/airflow/lib/python3.9/selectors.py", line 360, in register self._selector.register(key.fd, poller_events) OSError: [Errno 22] Invalid argument
I tried this and getting below error
2025-05-02 11:07:55,432 ERROR - Exception when executing DagProcessorJob
Traceback (most recent call last):
File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
self.processor.run()
File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 262, in run
return self._run_parsing_loop()
File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop
self._start_new_processes()
File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes
processor = self._create_process(file)
File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process
return DagFileProcessorProcess.start(
File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/processor.py", line 245, in start
proc: Self = super().start(target=target, **kwargs)
File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start
proc._register_pipe_readers(
File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers
self.selector.register(
File "/Users/rahulvats/.pyenv/versions/3.10.12/lib/python3.10/selectors.py", line 523, in register
self._selector.control([kev], 0, 0)
OSError: [Errno 9] Bad file descriptor
I have the same issue. cannot run airflow dag-processor in daemon mode
@yzhsieh are you still seeing this issue? I think it is due to macos having some issues with forking + multithreading.
when in DAEMON mode, it seems to be not passing the argument, hence the error
2025-05-11 09:26:50,356 ERROR - Exception when executing DagProcessorJob Traceback (most recent call last): File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute self.processor.run() File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 262, in run return self._run_parsing_loop() File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop self._start_new_processes() File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes processor = self._create_process(file) File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process return DagFileProcessorProcess.start( File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/processor.py", line 245, in start proc: Self = super().start(target=target, **kwargs) File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start proc._register_pipe_readers( File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers self.selector.register( File "/usr/lib64/python3.9/selectors.py", line 360, in register self._selector.register(key.fd, poller_events) OSError: [Errno 22] Invalid argument
You can use this for now,
nohup airflow dag-processor > nohup.out &
@uranusjr would you like to take a look into this?
I tried this and getting below error
2025-05-02 11:07:55,432 ERROR - Exception when executing DagProcessorJob Traceback (most recent call last): File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute self.processor.run() File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 262, in run return self._run_parsing_loop() File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop self._start_new_processes() File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes processor = self._create_process(file) File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process return DagFileProcessorProcess.start( File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/processor.py", line 245, in start proc: Self = super().start(target=target, **kwargs) File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start proc._register_pipe_readers( File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers self.selector.register( File "/Users/rahulvats/.pyenv/versions/3.10.12/lib/python3.10/selectors.py", line 523, in register self._selector.control([kev], 0, 0) OSError: [Errno 9] Bad file descriptor
I'm now able to repro this one
I suspect it might relate to using daemon with https://github.com/apache/airflow/blob/f360bc8024b730062f463da310c2af27656b62b6/task-sdk/src/airflow/sdk/execution_time/supervisor.py#L455, but I'm somewhat stuck. It would be much appreciated if @uranusjr had some suggestions on this one. Thanks!
Cc @uranusjr @kaxil if you have any suggestions on this
probably need to go back to https://github.com/apache/airflow/pull/34945 to find whether there're some clues
We'll wait for the merging of https://github.com/apache/airflow/pull/51699 and see how it works
Not fixed by #51699 . It's likely something to do with deamon mode closing file descriptors if I were to guess.
Yes, I have also encountered this problem, dagprocessor daemon startup error. Do we have to wait for the next version update
We haven’t been able to find the direct cause yet.
We haven’t been able to find the direct cause yet.
I won't find the reason anymore, I'll just wait for the next version
Just curious, why is this prioritized as a low? Is it because people are expected to use:
nohup airflow dag-processor > nohup.out &
as a workaround?
+1 I also have the same issue
Is it because you're trying to re-open stdout in _reopen_std_io_handles() after a double fork?
Also, comment here looks incorrect?
# Close the remaining parent-end of the sockets we've passed to the child via fork. We still have the # other end of the pair open
Note that for me, the api-server also exits but without a visible error
_reopen_std_io_handles is re-opening them to the sockets already opened in the parent. It is not trying to connect to the "real" tty connected std handles at that point.
Just curious, why is this prioritized as a low? Is it because people are expected to use:
nohup airflow dag-processor > nohup.out &as a workaround?
Something like that. And it's low because in the grand scheme of things not many people use daemon mode (running in a container, or a systemd service, both of which still have stdout and err connected, is much more common).
I'm happy to help review or brainstorm a PR to fix this if someone wants to tackle it.
Yep. Daemon mode is pretty "old-school" of running thigns and workaround is easy. And indeed usually if someone really wants a daemon mode and understands what it is, they are likely a good candidate to implement and test fix.
As everything in open-source, we are happy for contributions. Not everything (or even very little) has to be done by maintainers.
Think I found the issue.
DagFileProcessorManager, and its selector, are created before forking to daemon mode.
But the default selector (epoll for me), is using a socket internally that is probably non-inheritable.
A simple self.selector = selectors.DefaultSelector() at the start of DagFileProcessorManager.run() seem to fix it.
Will try to get a PR in, if that seem an acceptable fix?
Sounds like it, yeah @lp-jump