ImportError: cannot import name 'SUPERVISOR_COMMS' with dag.test()
Apache Airflow version
3.0.2
If "Other Airflow 2 version" selected, which one?
No response
What happened?
The exception occurred when the DAG ran with dag.test() attempted to retrieve a variable from the API server. Some similar issues have been opened (#48554, #51062, #51316). The PRs provided as a solution (#50300, #50419) were included in 3.0.2 but did not fix the problem.
Exception has occurred: ImportError
cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
File "/workspaces/airflow/test.py", line 7, in <module>
x = Variable.get("my_variable")
^^^^^^^^^^^^^^^^^^^^^^^^
ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner' (/workspaces/airflow/.venv/lib/python3.12/site-packages/airflow/sdk/execution_time/task_runner.py)
What you think should happen instead?
The variable should have been successfully retrieved without exceptions.
How to reproduce
- Set the variable:
airflow variables set my_variable my_value - Run DAG:
import logging
from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
from airflow.sdk import Variable
x = Variable.get("my_variable")
def my_function(my_var: str) -> None:
logging.getLogger(__name__).info(my_var)
with DAG("test_dag") as dag:
start = EmptyOperator(task_id="start")
py_func = PythonOperator(
task_id="py_func",
python_callable=my_function,
op_kwargs={
"my_var": x
}
)
end = EmptyOperator(task_id="end")
start >> py_func >> end
if __name__ == "__main__":
dag.test()
Operating System
Debian GNU/Linux 12 (bookworm)
Versions of Apache Airflow Providers
No response
Deployment
Other Docker-based deployment
Deployment details
Extended image based on apache/airflow:slim-3.0.2-python3.12
Anything else?
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
CC: @kaxil we should pick this up for 3.0.3 if its actually an issue, haven't triaged it
Yeah it is in an issue. The problem is the dag is already parsed before the dag.test code can be called which would assign task_runner.SUPERVISOR_COMMS = InProcessSupervisorComms
Assuming we can detect the case when this is being imported from doing python mydag.py (which I'm 95% sure we can, so that is the 'easy' part) how should we actually obtain a value/variable?
I.e. ignoring all practicialities or other issues, what should this do, given the increased security model in Airflow 3 of not allowing unfettered DB access?
Assuming we can detect the case when this is being imported from doing
python mydag.py(which I'm 95% sure we can, so that is the 'easy' part) how should we actually obtain a value/variable?I.e. ignoring all practicialities or other issues, what should this do, given the increased security model in Airflow 3 of not allowing unfettered DB access?
Since we allow that in the actual dag parsing models (as it goes via DAG processor -> Supervisor comms -> Variable), we should do the same for dag.test.
Now regarding the security model, currently Task Exec API is tied with Task Identity so this becomes a no-go. But that's a breaking change. (Unless ofcourse we want to include DAG file identity too somehow).
@opeida The ideal way would be to not access Variables or Connections at the top of your file (i.e outside Task Context). In your case, you can rewrite the dag as follows:
import logging
from airflow import DAG
from airflow.providers.standard.operators.empty import EmptyOperator
from airflow.providers.standard.operators.python import PythonOperator
def my_function(my_var: str) -> None:
logging.getLogger(__name__).info(my_var)
with DAG("test_dag") as dag:
start = EmptyOperator(task_id="start")
py_func = PythonOperator(
task_id="py_func",
python_callable=my_function,
op_kwargs={
"my_var": "{{ var.value.my_variable }}"
}
)
end = EmptyOperator(task_id="end")
start >> py_func >> end
if __name__ == "__main__":
dag.test()
So: detecting when we have top-level dag code, at least in the python mydagfile.py case:
def __getattr__(name):
if name == "SUPERVISOR_COMMS" and "__main__" in sys.modules:
import traceback
frames = [
frame
for (frame, lnum) in traceback.walk_stack(None)
if not frame.f_code.co_filename.startswith("<frozen importlib.")
]
if sys.modules["__main__"].__file__ == frames[-1].f_code.co_filename:
raise RuntimeError("Top level API access here")
raise AttributeError(f"module {__name__!r} has no attribute {name!r}")
(Put this in airflow/sdk/execution_time/task_runner.py. It won't help for cases in unit tests where someone imports the dag. But that could be handled in other cases)
So now back to the main question: can we "just" make it set up an in-process API server there to try and ask the local DB in that case? Are there any security risks of doing that?
@opeida The ideal way would be to not access Variables or Connections at the top of your file (i.e outside Task Context)
@kaxil are there workarounds to eliminate the use of top-level variables for dynamic DAG generation? We pull data from numerous accounts of a third-party provider. The accounts variable can change dynamically and stores sensitive information including API keys. In all other cases, we utilize Jinja templates within Task Context following Airflow's best practices.
provider_name = "<provider_name>"
ALL_ACCOUNTS = Variable.get(f"{provider_name}_accounts")
for account in ALL_ACCOUNTS:
with DAG(
f"{provider_name}.{account.get('name')}",
default_args=default_args,
schedule="10 * * * *",
catchup=False,
max_active_runs=1
) as dag:
# DAG operators
You can just do following in your case:
provider_name = "<provider_name>"
def my_function(my_var: str) -> None:
logging.getLogger(__name__).info(my_var)
with DAG("test_dag") as dag:
start = EmptyOperator(task_id="start")
py_func = PythonOperator(
task_id="py_func",
python_callable=my_function,
op_kwargs={
"my_var": "{{ var.value." + provider_name + "_accounts }}"
}
)
end = EmptyOperator(task_id="end")
start >> py_func >> end
So: detecting when we have top-level dag code, at least in the
python mydagfile.pycase:def getattr(name): if name == "SUPERVISOR_COMMS" and "main" in sys.modules: import traceback
frames = [ frame for (frame, lnum) in traceback.walk_stack(None) if not frame.f_code.co_filename.startswith("<frozen importlib.") ] if sys.modules["__main__"].__file__ == frames[-1].f_code.co_filename: raise RuntimeError("Top level API access here") raise AttributeError(f"module {__name__!r} has no attribute {name!r}")(Put this in
airflow/sdk/execution_time/task_runner.py. It won't help for cases in unit tests where someone imports the dag. But that could be handled in other cases)
aah 💡 .
So now back to the main question: can we "just" make it set up an in-process API server there to try and ask the local DB in that case? Are there any security risks of doing that?
Unsure right now -- need to think it through
py_func = PythonOperator( task_id="py_func", python_callable=my_function, op_kwargs={ "my_var": "{{ var.value." + provider_name + "_accounts }}" } )
@kaxil the provided DAG reproduces the issue only and does not reflect our actual case. We need the ability to trigger a certain account, gather metrics to StatsD for each account, and utilize the additional flexibility that dynamic DAG generation offers.
Thank you for your reply. I look forward to testing the fix if one is provided. Feel free to ping me if you need any feedback from the user perspective.
UPD: import Variable from airflow.models instead of airflow.sdk fixes the problem but doesn't seem like a best practice solution.
UPD: import
Variablefromairflow.modelsinstead ofairflow.sdkfixes the problem but doesn't seem like a best practice solution.
Yeah, that still works as it's falling back to the direct DB access based approach
Import from airflow.models works for me to load ENV defined variable. Is airflow.sdk not well oriented for non-runner environment yet? I have seen various troubles already like dag.folder is wrong when running thru CLI, sys.path not being properly enhanced...
We use variables for dynamic DAG scheduling, so we can access and modify schedules via web-UI by ourselves, without having to change deployment configs (OS environment variables) through DevOps guys. Example:
with DAG(
"DAG_1",
schedule=Variable.get("DAG_1_SCHEDULE", None)
)
So I feel like I have to ask: will the final fixed version allow for us to do the same?
We use variables for dynamic DAG scheduling, so we can access and modify schedules via web-UI by ourselves, without having to change deployment configs (OS environment variables) through DevOps guys. Example:
with DAG( "DAG_1", schedule=Variable.get("DAG_1_SCHEDULE", None) ) So I feel like I have to ask: will the final fixed version allow for us to do the same?
Yes, although timeline TBD