Add user impersonation (`run_as_user`) support for task execution
closes: https://github.com/apache/airflow/issues/50423
Why?
Airflow 2 had support for user impersonation: https://airflow.apache.org/docs/apache-airflow/stable/security/workload.html. Quoting from docs:
Airflow has the ability to impersonate a unix user while running task instances based on the task’s run_as_user parameter, which takes a user’s name.
The intention here is to de-elevate the user running the task to reduce priviliges from the process / worker launching the process which runs as root. We can configure the task to impersonate as an user with lesser priviliges and control the behaviour of the tasks running for a more secure task run.
Quoting one of the use case from one of the airflow users too:
I suspect there might be something wrong with user impersonation (run_as_user) in 3.0.0+ with docker (tested on 3.0.1rc1). My setup uses an overlay filesystem so I can fake-interact with a shared network drive. When I connect to the docker cli, I can do sudo -u data_user -i and create/delete files without problem. However, the celery worker encounters a permission error when running a task that writes to the network drive as that very same data_user. Does anyone have any idea what might be the reason? Anything I can try to pinpoint the issue? In 2.10.5 my setup worked fine. The difference in configurations comes down to changes to the "official" docker-compose from 2.x to 3.x (on top of which my own additions, which didn't change, are included).
https://apache-airflow.slack.com/archives/CCQB40SQJ/p1746728794387939
Approach
We implement user impersonation by re-executing the task process with sudo as the specified user (run_as_user). The key is preserving the communication channel between the task and supervisor across this re-execution.
To do this we use the os.execvp which basically swaps out the current process with a new one. We run the task runner process indeed again but with the specified user.
Process Flow
- Initial process starts as Airflow user as usual
- If
run_as_useris set, we:- Store startup details in environment
- Make socket inheritable so the communication between supervisor and this rexec'd process can go on.
- Re-execute with sudo as target user which is specified
- Re-executed process:
- Detects it's re-executed via environment variable
- Retrieves startup details from environment
- Continues task execution as target user
Things to note
Startup Details Storage
- Store the startup details in the env variable so that the rexec process can pick it up without waiting for StartupDetails from the supervisor -- which wont come anyway. This also prevents sending duplicate
/runcall to the API server which makes the operation idempotent.
os.environ["_AIRFLOW__REEXECUTED_PROCESS"] = "1"
os.environ["_AIRFLOW__STARTUP_MSG"] = msg.model_dump_json()
This ensures the re-executed process has all necessary context without needing to re-parse the DAG or re-establish connections.
Socket Inheritance
os.set_inheritable(SUPERVISOR_COMMS.request_socket.fileno(), True)
This is imp because:
- The socket is created by the supervisor
- It needs to be accessible to the re-executed process
- File descriptors are preserved across exec
Why os.execvp?
We use os.execvp instead of subprocess or os.fork because:
- It replaces the current process with the new one
- Preserves file descriptors by default which we created in the supervisor and do not want to rework / disturb those connections.
- Maintains the same process id, which is important for supervisor tracking
- Allows passing environment variables with
-Eflag and-Htoo.
Communication Channel
Introduced a helper to access the SUPERVISOR_COMMS via an utility function. This is because when we re-exec, the other modules were failing to be able to access the SUPERVISOR_COMMS defined at module level in task_runner (i am not so sure of the exact reason, but seemed like init sequence or its a python thing), but this approach does no harm. The new way to access things via SUPERVISOR_COMMS is to use the get_supervisor_comms helper which returns existing instance if available or returns it by initing.
Made a change to The communication channel is initialized lazily via get_supervisor_comms():
def get_supervisor_comms() -> CommsDecoder[ToTask, ToSupervisor]:
global SUPERVISOR_COMMS
if SUPERVISOR_COMMS is None:
SUPERVISOR_COMMS = CommsDecoder[ToTask, ToSupervisor](input=sys.stdin)
return SUPERVISOR_COMMS
This ensures:
- Single instance of communication channel
- Proper initialization in both initial and re-executed processes
Testing
Intention is to run airflow as "root" and switch to a lesser privileged user: "airflowuser". We will try and use a user that cannot list some files like /root/airflow/airflow.cfg intentionally.
Setup for testing
- Run airflow with celery executor
- Create a "airflowuser":
sudo useradd -m -s /bin/bash airflowuser - Switch to "airflowuser" and ensure that the privilieges are lesser, for eg:
root@1b92a329d570:/opt/airflow# sudo -u airflowuser -i
airflowuser@1b92a329d570:~$ namei -l /root/airflow/airflow.cfg
f: /root/airflow/airflow.cfg
drwxr-xr-x root root /
drwx------ root root root
airflow - Permission denied
- Run celery worker with root now:
root@1b92a329d570:/opt/airflow# airflow celery worker
2025-05-29 07:41:56.804540 [info ] starting stale bundle cleanup process [airflow.providers.celery.cli.celery_command]
[2025-05-29 07:41:56 +0000] [418] [INFO] Starting gunicorn 23.0.0
[2025-05-29 07:41:56 +0000] [418] [INFO] Listening at: http://[::]:8793 (418)
[2025-05-29 07:41:56 +0000] [418] [INFO] Using worker: sync
[2025-05-29 07:41:56 +0000] [420] [INFO] Booting worker with pid: 420
[2025-05-29 07:41:56 +0000] [421] [INFO] Booting worker with pid: 421
2025-05-29 07:41:58.159935 [warning ] You're running the worker with superuser privileges: this is
absolutely not recommended!
Please specify a different user using the --uid option.
User information: uid=0 euid=0 gid=0 egid=0
[py.warnings] category=SecurityWarning filename=/usr/local/lib/python3.9/site-packages/celery/platforms.py lineno=84
General Testing:
Test 1: Running a simple task that runs with airflowuser and pushes an xcom:
DAG:
from airflow import DAG
from datetime import datetime
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="check_cfg_file_access",
schedule=None,
catchup=False,
) as dag:
# check_access = BashOperator(
# task_id="ls_root_cfg",
# bash_command="ls -l /root/airflow/airflow.cfg",
# run_as_user="airflowuser"
# )
check_access = BashOperator(
task_id="say_hi",
bash_command="echo $HOME",
run_as_user="airflowuser"
)
Test 2: Running a simple task that tries to get a connection and set a variable with runasuser
DAG:
from __future__ import annotations
from airflow.models.baseoperator import BaseOperator
from airflow import DAG
from airflow.sdk.definitions.connection import Connection
class CustomOperator(BaseOperator):
def execute(self, context):
gc = Connection.get("athena_default")
print("The conn is", gc)
# print("Conn uri is", gc.get_uri())
with DAG("example_get_connection_from_defn", schedule=None, catchup=False) as dag:
CustomOperator(task_id="print_conn", run_as_user="airflowuser")
Testing run_as_user functionality
Test 1: run_as_user trying to access /root/airflow/airflow.cfg
DAG Used:
from airflow import DAG
from datetime import datetime
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="check_cfg_file_access",
schedule=None,
catchup=False,
) as dag:
check_access = BashOperator(
task_id="ls_root_cfg",
bash_command="ls -l /root/airflow/airflow.cfg",
)
Errors out, logs:
Test 2: Do not provide run_as_user but override with the conf instead: "airflowuser" itself
Set env in worker:
export AIRFLOW__CORE__DEFAULT_IMPERSONATION="airflowuser"
DAG Used:
from airflow import DAG
from datetime import datetime
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="check_cfg_file_access",
schedule=None,
catchup=False,
) as dag:
check_access = BashOperator(
task_id="ls_root_cfg",
bash_command="ls -l /root/airflow/airflow.cfg",
)
Same error as before:
Test 3: Provide run_as_user and in conf, to check which one is picked up
In worker, create new user: randomuser and set env to "airflowuser"
sudo useradd -m -s /bin/bash randomuser
export AIRFLOW__CORE__DEFAULT_IMPERSONATION="airflowuser"
DAG used:
from airflow import DAG
from datetime import datetime
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="check_cfg_file_access",
schedule=None,
catchup=False,
) as dag:
check_access = BashOperator(
task_id="ls_root_cfg",
bash_command="ls -l /root/airflow/airflow.cfg",
run_as_user="randomuser"
)
Random user picked up:
Test 4: User not present
DAG:
from airflow import DAG
from datetime import datetime
from airflow.providers.standard.operators.bash import BashOperator
with DAG(
dag_id="check_cfg_file_access",
schedule=None,
catchup=False,
) as dag:
check_access = BashOperator(
task_id="ls_root_cfg",
bash_command="ls -l /root/airflow/airflow.cfg",
run_as_user="not_a_user"
)
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in airflow-core/newsfragments.