airflow
airflow copied to clipboard
Metastore not available from external_python task
Hi, i have decided to create a new issue, however, I think it relates to #48554.
I get the following error in v3.0.1 when trying to retrieve variables from the metastore within an external python environment ImportError: cannot import name 'SUPERVISOR_COMMS' from 'airflow.sdk.execution_time.task_runner'.
Retrieving the same variable within the same DAG but using an ordinary task, works just fine. I'm therefore let to believe that this is caused by accessing the metastore from an external environment. The variable shows when executing the airflow variables list command from an python environment activated terminal. In my opinion, the latter suggests, that the virtual environment should be properly set up.
Retrieving variables from the metastor within external python environments worked fine for me in v2.10.5 (however using from airflow.models import Variable instead of from airflow.sdk import Variable)
I cannot retrieve connections stored in the metastore from an external environment either. However, I do not know if this is caused by the same root problem.
My original post can be found beneath .
I seem to get this error in 3.0.1 as well when using
Variable.get("my_variable")inside an external_python task. The variable is perfectly able to be retrieved from an ordinary task. The variable is also listed when executing theairflow variables listcommand from an python environment activated terminal. The latter suggests, that the virtual environment should be properly set up, in my opinion. This setup worked fine in airflow 2.10.5 (however usingfrom airflow.models import Variableinstead offrom airflow.sdk import Variable)I don't know if the cause of this issue is the same, but sounds similar to me.
Originally posted by @fromm1990 in #48554
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
@fromm1990 Could you provide a reproduce example? did you just create a Python operator and tried to import it there? or perhaps a virtualvenvoperator instead? I think I can fix the issue, I just need to know what exactly the issue is.
Hi @Nataneljpwd, thanks for reaching out. The setup below reproduces the error consitently on my setup.
Apache airflow version 3.0.2
Added variable my_test_variable using the web ui (Admin->Variables->Add Variable)
Here is my pyproject.toml file for the virtual environment:
[project]
name = "test_venv"
version = "0.1.0"
description = "Add your description here"
readme = "README.md"
requires-python = ">=3.12"
dependencies = [
"apache-airflow[postgres]==3.0.2"
]
Here is my DAG:
from airflow.decorators import dag, task
VENV_PATH = "/opt/airflow/projects/test_proj/.venv/bin/python"
@task.external_python(python=VENV_PATH)
def external_python_task():
import logging
from airflow.sdk import Variable
logger = logging.getLogger(__name__)
test_variable = Variable.get("my_test_variable")
logger.info(test_variable)
@dag(dag_id="metastore_test_dag")
def metastore_test_dag():
external_python_task()
metastore_test_dag()
Please let me know if you need more details about the setup?
So I have been able to reproduce the issue, I think it might be due to the different ways that the import is done, I will keep this discussion updated on my progress so that anyone could follow.
I think I have found a way to fix it, I will implement and test it