airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Inlets not handled properly with @task functions

Open ninowalker opened this issue 1 year ago • 3 comments
trafficstars

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.0

What happened?

A task with an outlet should convey it's outlets as inlets to a @task decorated function, when instantiated with inlets=AUTO. Instead, the string "auto" (the value of airflow.lineage.AUTO) is passed as the value, rather than the resolved outlets.

Consider a minimal test case:

from airflow import DAG
from airflow.decorators import task
from airflow.lineage import AUTO
from datetime import datetime, timedelta

from airflow.operators.empty import EmptyOperator

DAG_ID = "test_lineage"

with DAG(
    dag_id=DAG_ID,
    schedule=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=5),
    catchup=False,
) as dag:
    f = "filename"
    op1 = EmptyOperator(
        task_id="leave1",
        outlets=[f],
    )

    @task
    def test_inlets(inlets):
        assert inlets == [f], f"{inlets} != {[f]}"
        return True

    op1 >> test_inlets(inlets=AUTO)

What you think should happen instead?

inlets should resolve to the outlets of the upstream task, instead of being unresolved.

Instead:

AssertionError: ['auto'] != ['filename']'

How to reproduce

  1. Create the DAG, given the test code above.
  2. Run the DAG.
  3. Be sad :(

Operating System

OSX

Versions of Apache Airflow Providers

apache-airflow-providers-amazon==8.13.0
apache-airflow-providers-common-sql==1.9.0
apache-airflow-providers-discord==3.5.0
apache-airflow-providers-ftp @ file:///home/conda/feedstock_root/build_artifacts/apache-airflow-providers-ftp_1702441975571/work
apache-airflow-providers-google==10.12.0
apache-airflow-providers-http @ file:///home/conda/feedstock_root/build_artifacts/apache-airflow-providers-http_1706649884002/work
apache-airflow-providers-imap @ file:///home/conda/feedstock_root/build_artifacts/apache-airflow-providers-imap_1702443030927/work
apache-airflow-providers-papermill==3.6.0
apache-airflow-providers-postgres==5.9.0
apache-airflow-providers-sqlite==3.6.0

Deployment

Other

Deployment details

Running standalone.

Anything else?

No response

Are you willing to submit PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

ninowalker avatar Feb 15 '24 21:02 ninowalker

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar Feb 15 '24 21:02 boring-cyborg[bot]

For additional context, with the python debugger inside execute_callable of the PythonOperator (which wraps the function in question):

  1. It's calling self.python_callable(*self.op_args, **self.op_kwargs)
  2. self.op_kwargs are unresolved.
  3. However, self.inlets does contain the correct values.
  4. Thus, the problem is with the PythonOperator not supporting inlets.

Is this by design? Seems not.

> /Users/rfc/miniforge3/envs/openwtf/lib/python3.11/site-packages/airflow/operators/python.py(216)execute_callable()
-> return self.python_callable(*self.op_args, **self.op_kwargs)
(Pdb) p self.op_kwargs
{'inlets': ['auto'], 'execution_date': '2024-02-15T22:14:46.598812+00:00'}
(Pdb) p self.inlets
['filename']

Possible patch

class PythonOperator(BaseOperator):
    ...
    def execute_callable(self) -> Any:
        """
        Call the python callable with the given arguments.

        :return: the return value of the call.
        """
        kwargs = {**self.op_kwargs}
        if self.inlets:
            kwargs["inlets"] = self.inlets
        return self.python_callable(*self.op_args, **kwargs)

ninowalker avatar Feb 15 '24 22:02 ninowalker

This works fine to me

from airflow import DAG
from airflow.decorators import task
from airflow.lineage import AUTO
from datetime import datetime, timedelta

from airflow.operators.empty import EmptyOperator

DAG_ID = "test_lineage"

with DAG(
    dag_id=DAG_ID,
    schedule=None,
    start_date=datetime(2021, 1, 1),
    dagrun_timeout=timedelta(minutes=5),
    catchup=False,
) as dag:
    f = "filename"
    op1 = EmptyOperator(
        task_id="leave1",
        outlets=[f],
    )

    @task(inlets=AUTO)
    def test_inlets(**kwargs):
        task_ = kwargs["task"]
        assert task_.inlets == [f], f"{task_.inlets} != {[f]}"

I'm surprised that someone use Experimental Linage

Taragolis avatar Feb 19 '24 10:02 Taragolis

This issue has been automatically marked as stale because it has been open for 14 days with no response from the author. It will be closed in next 7 days if no further activity occurs from the issue author.

github-actions[bot] avatar Mar 05 '24 00:03 github-actions[bot]

This issue has been closed because it has not received response from the issue author.

github-actions[bot] avatar Mar 12 '24 00:03 github-actions[bot]