airflow
airflow copied to clipboard
Inlets not handled properly with @task functions
Apache Airflow version
Other Airflow 2 version (please specify below)
If "Other Airflow 2 version" selected, which one?
2.8.0
What happened?
A task with an outlet should convey it's outlets as inlets to a @task decorated function, when instantiated with inlets=AUTO. Instead, the string "auto" (the value of airflow.lineage.AUTO) is passed as the value, rather than the resolved outlets.
Consider a minimal test case:
from airflow import DAG
from airflow.decorators import task
from airflow.lineage import AUTO
from datetime import datetime, timedelta
from airflow.operators.empty import EmptyOperator
DAG_ID = "test_lineage"
with DAG(
dag_id=DAG_ID,
schedule=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=5),
catchup=False,
) as dag:
f = "filename"
op1 = EmptyOperator(
task_id="leave1",
outlets=[f],
)
@task
def test_inlets(inlets):
assert inlets == [f], f"{inlets} != {[f]}"
return True
op1 >> test_inlets(inlets=AUTO)
What you think should happen instead?
inlets should resolve to the outlets of the upstream task, instead of being unresolved.
Instead:
AssertionError: ['auto'] != ['filename']'
How to reproduce
- Create the DAG, given the test code above.
- Run the DAG.
- Be sad :(
Operating System
OSX
Versions of Apache Airflow Providers
apache-airflow-providers-amazon==8.13.0
apache-airflow-providers-common-sql==1.9.0
apache-airflow-providers-discord==3.5.0
apache-airflow-providers-ftp @ file:///home/conda/feedstock_root/build_artifacts/apache-airflow-providers-ftp_1702441975571/work
apache-airflow-providers-google==10.12.0
apache-airflow-providers-http @ file:///home/conda/feedstock_root/build_artifacts/apache-airflow-providers-http_1706649884002/work
apache-airflow-providers-imap @ file:///home/conda/feedstock_root/build_artifacts/apache-airflow-providers-imap_1702443030927/work
apache-airflow-providers-papermill==3.6.0
apache-airflow-providers-postgres==5.9.0
apache-airflow-providers-sqlite==3.6.0
Deployment
Other
Deployment details
Running standalone.
Anything else?
No response
Are you willing to submit PR?
- [X] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.
For additional context, with the python debugger inside execute_callable of the PythonOperator (which wraps the function in question):
- It's calling
self.python_callable(*self.op_args, **self.op_kwargs) self.op_kwargsare unresolved.- However,
self.inletsdoes contain the correct values. - Thus, the problem is with the PythonOperator not supporting
inlets.
Is this by design? Seems not.
> /Users/rfc/miniforge3/envs/openwtf/lib/python3.11/site-packages/airflow/operators/python.py(216)execute_callable()
-> return self.python_callable(*self.op_args, **self.op_kwargs)
(Pdb) p self.op_kwargs
{'inlets': ['auto'], 'execution_date': '2024-02-15T22:14:46.598812+00:00'}
(Pdb) p self.inlets
['filename']
Possible patch
class PythonOperator(BaseOperator):
...
def execute_callable(self) -> Any:
"""
Call the python callable with the given arguments.
:return: the return value of the call.
"""
kwargs = {**self.op_kwargs}
if self.inlets:
kwargs["inlets"] = self.inlets
return self.python_callable(*self.op_args, **kwargs)
This works fine to me
from airflow import DAG
from airflow.decorators import task
from airflow.lineage import AUTO
from datetime import datetime, timedelta
from airflow.operators.empty import EmptyOperator
DAG_ID = "test_lineage"
with DAG(
dag_id=DAG_ID,
schedule=None,
start_date=datetime(2021, 1, 1),
dagrun_timeout=timedelta(minutes=5),
catchup=False,
) as dag:
f = "filename"
op1 = EmptyOperator(
task_id="leave1",
outlets=[f],
)
@task(inlets=AUTO)
def test_inlets(**kwargs):
task_ = kwargs["task"]
assert task_.inlets == [f], f"{task_.inlets} != {[f]}"
I'm surprised that someone use Experimental Linage
This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.
This issue has been closed because it has not received response from the issue author.