airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Dag processor cannot run in daemon mode

Open yzhsieh opened this issue 8 months ago • 12 comments

Apache Airflow version

3.0.0

If "Other Airflow 2 version" selected, which one?

No response

What happened?

When starting dag-processor using command: airflow dag-processor --daemon, it will end with following error:

OSError: [Errno 22] Invalid argument

Logs:

2025-05-01 00:50:40,229 INFO - Starting the Dag Processor Job
2025-05-01 00:50:40,229 INFO - Processing files using up to 2 processes at a time 
2025-05-01 00:50:40,230 INFO - Process each file at most once every 30 seconds
2025-05-01 00:50:40,231 INFO - DAG bundles loaded: dags-folder, example_dags
2025-05-01 00:50:40,235 INFO - DAG bundles loaded: dags-folder, example_dags
2025-05-01 00:50:40,235 INFO - Checking for new files in bundle dags-folder every 300 seconds
2025-05-01 00:50:40,236 INFO - Checking for new files in bundle example_dags every 300 seconds
2025-05-01 00:50:40,241 INFO - Refreshing bundle dags-folder
2025-05-01 00:50:40,246 INFO - Searching for files in dags-folder at /home/yzhsieh/airflow/dags
2025-05-01 00:50:40,246 INFO - Found 0 files for bundle dags-folder
2025-05-01 00:50:40,258 INFO - Refreshing bundle example_dags
2025-05-01 00:50:40,263 INFO - Searching for files in example_dags at /home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/example_dags
2025-05-01 00:50:40,273 INFO - Found 58 files for bundle example_dags
2025-05-01 00:50:40,296 ERROR - Exception when executing DagProcessorJob
Traceback (most recent call last):
  File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 262, in run
    return self._run_parsing_loop()
  File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop
    self._start_new_processes()
  File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes
    processor = self._create_process(file)
  File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process
    return DagFileProcessorProcess.start(
  File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/dag_processing/processor.py", line 245, in start
    proc: Self = super().start(target=target, **kwargs)
  File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start
    proc._register_pipe_readers(
  File "/home/yzhsieh/tmp/airflow_venv/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers
    self.selector.register(
  File "/usr/lib/python3.10/selectors.py", line 360, in register
    self._selector.register(key.fd, poller_events)
OSError: [Errno 22] Invalid argument

What you think should happen instead?

No response

How to reproduce

  1. Create an python venv and install airflow via Pypi
  2. Use airflow standalone to setup a simple Airflow instance
  3. Issue airflow dag-processor --daemon, then we can find error in airflow-dag-processor.log

In step 3, if I use airflow dag-processor instead, then it will work without any error

Operating System

Ubuntu 22.04.5 LTS

Versions of Apache Airflow Providers

(Not install any extra provider)

Deployment

Virtualenv installation

Deployment details

Python 3.10.12

Anything else?

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

yzhsieh avatar Apr 30 '25 16:04 yzhsieh

I also encountered the same problem, as shown in the picture 2025-05-01 20:49:51,117 INFO - Starting the Dag Processor Job 2025-05-01 20:49:51,117 INFO - Processing files using up to 2 processes at a time 2025-05-01 20:49:51,118 INFO - Process each file at most once every 30 seconds 2025-05-01 20:49:51,120 INFO - DAG bundles loaded: dags-folder 2025-05-01 20:49:51,124 INFO - DAG bundles loaded: dags-folder 2025-05-01 20:49:51,124 INFO - Checking for new files in bundle dags-folder every 300 seconds 2025-05-01 20:49:51,128 INFO - Refreshing bundle dags-folder 2025-05-01 20:49:51,130 INFO - Searching for files in dags-folder at /root/airflow/dags 2025-05-01 20:49:51,133 INFO - Found 15 files for bundle dags-folder 2025-05-01 20:49:51,152 ERROR - Exception when executing DagProcessorJob Traceback (most recent call last): File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute self.processor.run() File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 262, in run return self._run_parsing_loop() File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop self._start_new_processes() File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes processor = self._create_process(file) File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process return DagFileProcessorProcess.start( File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/dag_processing/processor.py", line 245, in start proc: Self = super().start(target=target, **kwargs) File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start proc._register_pipe_readers( File "/root/miniconda3/envs/airflow/lib/python3.9/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers self.selector.register( File "/root/miniconda3/envs/airflow/lib/python3.9/selectors.py", line 360, in register self._selector.register(key.fd, poller_events) OSError: [Errno 22] Invalid argument

yaobaishijie avatar May 01 '25 13:05 yaobaishijie

I tried this and getting below error

2025-05-02 11:07:55,432 ERROR - Exception when executing DagProcessorJob
Traceback (most recent call last):
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 262, in run
    return self._run_parsing_loop()
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop
    self._start_new_processes()
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes
    processor = self._create_process(file)
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process
    return DagFileProcessorProcess.start(
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/processor.py", line 245, in start
    proc: Self = super().start(target=target, **kwargs)
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start
    proc._register_pipe_readers(
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers
    self.selector.register(
  File "/Users/rahulvats/.pyenv/versions/3.10.12/lib/python3.10/selectors.py", line 523, in register
    self._selector.control([kev], 0, 0)
OSError: [Errno 9] Bad file descriptor

vatsrahul1001 avatar May 02 '25 05:05 vatsrahul1001

I have the same issue. cannot run airflow dag-processor in daemon mode

joyceguouk avatar May 06 '25 21:05 joyceguouk

@yzhsieh are you still seeing this issue? I think it is due to macos having some issues with forking + multithreading.

amoghrajesh avatar May 12 '25 12:05 amoghrajesh

when in DAEMON mode, it seems to be not passing the argument, hence the error 2025-05-11 09:26:50,356 ERROR - Exception when executing DagProcessorJob Traceback (most recent call last): File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute self.processor.run() File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 262, in run return self._run_parsing_loop() File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop self._start_new_processes() File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes processor = self._create_process(file) File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process return DagFileProcessorProcess.start( File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/dag_processing/processor.py", line 245, in start proc: Self = super().start(target=target, **kwargs) File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start proc._register_pipe_readers( File "/home/ec2-user/airflow/lib/python3.9/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers self.selector.register( File "/usr/lib64/python3.9/selectors.py", line 360, in register self._selector.register(key.fd, poller_events) OSError: [Errno 22] Invalid argument

You can use this for now,

nohup airflow dag-processor > nohup.out &

ravi-simtel avatar May 12 '25 13:05 ravi-simtel

@uranusjr would you like to take a look into this?

phanikumv avatar May 27 '25 10:05 phanikumv

I tried this and getting below error

2025-05-02 11:07:55,432 ERROR - Exception when executing DagProcessorJob
Traceback (most recent call last):
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/jobs/dag_processor_job_runner.py", line 61, in _execute
    self.processor.run()
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 262, in run
    return self._run_parsing_loop()
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 347, in _run_parsing_loop
    self._start_new_processes()
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 894, in _start_new_processes
    processor = self._create_process(file)
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/manager.py", line 876, in _create_process
    return DagFileProcessorProcess.start(
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/dag_processing/processor.py", line 245, in start
    proc: Self = super().start(target=target, **kwargs)
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 487, in start
    proc._register_pipe_readers(
  File "/Users/rahulvats/.pyenv/versions/3.10.12/envs/airflow-python310-env/lib/python3.10/site-packages/airflow/sdk/execution_time/supervisor.py", line 506, in _register_pipe_readers
    self.selector.register(
  File "/Users/rahulvats/.pyenv/versions/3.10.12/lib/python3.10/selectors.py", line 523, in register
    self._selector.control([kev], 0, 0)
OSError: [Errno 9] Bad file descriptor

I'm now able to repro this one

Lee-W avatar May 28 '25 07:05 Lee-W

I suspect it might relate to using daemon with https://github.com/apache/airflow/blob/f360bc8024b730062f463da310c2af27656b62b6/task-sdk/src/airflow/sdk/execution_time/supervisor.py#L455, but I'm somewhat stuck. It would be much appreciated if @uranusjr had some suggestions on this one. Thanks!

Lee-W avatar May 28 '25 11:05 Lee-W

Cc @uranusjr @kaxil if you have any suggestions on this

phanikumv avatar May 30 '25 09:05 phanikumv

probably need to go back to https://github.com/apache/airflow/pull/34945 to find whether there're some clues

Lee-W avatar Jun 02 '25 06:06 Lee-W

We'll wait for the merging of https://github.com/apache/airflow/pull/51699 and see how it works

Lee-W avatar Jun 16 '25 09:06 Lee-W

Not fixed by #51699 . It's likely something to do with deamon mode closing file descriptors if I were to guess.

ashb avatar Jun 17 '25 14:06 ashb

Yes, I have also encountered this problem, dagprocessor daemon startup error. Do we have to wait for the next version update

LengYue12389 avatar Jun 18 '25 03:06 LengYue12389

We haven’t been able to find the direct cause yet.

uranusjr avatar Jun 19 '25 08:06 uranusjr

We haven’t been able to find the direct cause yet.

I won't find the reason anymore, I'll just wait for the next version

LengYue12389 avatar Jun 19 '25 09:06 LengYue12389

Just curious, why is this prioritized as a low? Is it because people are expected to use:

nohup airflow dag-processor > nohup.out &

as a workaround?

darrenbarnes-crx avatar Oct 23 '25 19:10 darrenbarnes-crx

+1 I also have the same issue

Is it because you're trying to re-open stdout in _reopen_std_io_handles() after a double fork?

Also, comment here looks incorrect?

    # Close the remaining parent-end of the sockets we've passed to the child via fork. We still have the
    # other end of the pair open

Note that for me, the api-server also exits but without a visible error

lp-jump avatar Nov 13 '25 14:11 lp-jump

_reopen_std_io_handles is re-opening them to the sockets already opened in the parent. It is not trying to connect to the "real" tty connected std handles at that point.

ashb avatar Nov 13 '25 14:11 ashb

Just curious, why is this prioritized as a low? Is it because people are expected to use:

nohup airflow dag-processor > nohup.out &

as a workaround?

Something like that. And it's low because in the grand scheme of things not many people use daemon mode (running in a container, or a systemd service, both of which still have stdout and err connected, is much more common).

I'm happy to help review or brainstorm a PR to fix this if someone wants to tackle it.

ashb avatar Nov 13 '25 14:11 ashb

Yep. Daemon mode is pretty "old-school" of running thigns and workaround is easy. And indeed usually if someone really wants a daemon mode and understands what it is, they are likely a good candidate to implement and test fix.

As everything in open-source, we are happy for contributions. Not everything (or even very little) has to be done by maintainers.

potiuk avatar Nov 13 '25 17:11 potiuk

Think I found the issue. DagFileProcessorManager, and its selector, are created before forking to daemon mode. But the default selector (epoll for me), is using a socket internally that is probably non-inheritable. A simple self.selector = selectors.DefaultSelector() at the start of DagFileProcessorManager.run() seem to fix it. Will try to get a PR in, if that seem an acceptable fix?

lp-jump avatar Nov 14 '25 15:11 lp-jump

Sounds like it, yeah @lp-jump

ashb avatar Nov 14 '25 17:11 ashb