airflow icon indicating copy to clipboard operation
airflow copied to clipboard

`airflow dags reserialize` says "_TaskDecorator' object has no attribute 'uri'"

Open MatrixManAtYrService opened this issue 3 years ago • 0 comments

Apache Airflow version

main (development)

What happened

Here's are two DAGs:

from airflow import Dataset, DAG
from datetime import datetime

qux = Dataset("qux")

with DAG(
    dag_id="a1",
    start_date=datetime(1970, 1, 1),
    schedule_interval=None,
) as a1:

    @a1.task(outlets=[qux])
    def qux():
        pass

    qux()


with DAG(
    dag_id="a2",
    start_date=datetime(1970, 1, 1),
    # schedule_on=[qux],  # breaks if you enable this
) as a2:

    @a2.task
    def no_outlets():
        pass

    no_outlets()

They serialize ok if you leave commented the schedule_on entry in the second definition, but it breaks like so if you enable it:

❯ airflow dags reserialize
...
/home/matt/src/airflow/airflow/models/dagbag.py:604 DeprecationWarning: DAG.full_filepath is deprecated in favour of fileloc
[2022-08-05 14:36:50,070] {dagbag.py:604} ERROR - Failed to write serialized DAG: /home/matt/today/8.5/dags/dataset_network_A.py
Traceback (most recent call last):
  File "/home/matt/src/airflow/airflow/serialization/serialized_objects.py", line 1066, in serialize_dag
    dag_deps.update(DependencyDetector().detect_dag_dependencies(dag))
  File "/home/matt/src/airflow/airflow/serialization/serialized_objects.py", line 593, in detect_dag_dependencies
    return [
  File "/home/matt/src/airflow/airflow/serialization/serialized_objects.py", line 598, in <listcomp>
    dependency_id=x.uri,
AttributeError: '_TaskDecorator' object has no attribute 'uri'

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/home/matt/src/airflow/airflow/models/dagbag.py", line 593, in _serialize_dag_capturing_errors
    dag_was_updated = SerializedDagModel.write_dag(
  File "/home/matt/src/airflow/airflow/utils/session.py", line 68, in wrapper
    return func(*args, **kwargs)
  File "/home/matt/src/airflow/airflow/models/serialized_dag.py", line 154, in write_dag
    new_serialized_dag = cls(dag)
  File "<string>", line 4, in __init__
  File "/home/matt/today/8.5/venv/lib/python3.9/site-packages/sqlalchemy/orm/state.py", line 482, in _initialize_instance
    manager.dispatch.init_failure(self, args, kwargs)
  File "/home/matt/today/8.5/venv/lib/python3.9/site-packages/sqlalchemy/util/langhelpers.py", line 70, in __exit__
    compat.raise_(
  File "/home/matt/today/8.5/venv/lib/python3.9/site-packages/sqlalchemy/util/compat.py", line 208, in raise_
    raise exception
  File "/home/matt/today/8.5/venv/lib/python3.9/site-packages/sqlalchemy/orm/state.py", line 479, in _initialize_instance
    return manager.original_init(*mixed[1:], **kwargs)
  File "/home/matt/src/airflow/airflow/models/serialized_dag.py", line 101, in __init__
    dag_data = SerializedDAG.to_dict(dag)
  File "/home/matt/src/airflow/airflow/serialization/serialized_objects.py", line 1175, in to_dict
    json_dict = {"__version": cls.SERIALIZER_VERSION, "dag": cls.serialize_dag(var)}
  File "/home/matt/src/airflow/airflow/serialization/serialized_objects.py", line 1083, in serialize_dag
    raise SerializationError(f'Failed to serialize DAG {dag.dag_id!r}: {e}')
airflow.exceptions.SerializationError: Failed to serialize DAG 'a2': '_TaskDecorator' object has no attribute 'uri'

What you think should happen instead

No error

How to reproduce

Run airflow dags reserialize with the above file in your folder

Operating System

NixOS 21.11 (linux kernel 5.0.101)

Versions of Apache Airflow Providers

n/a

Deployment

Virtualenv installation

Deployment details

No response

Anything else

It doesn't happen if I use PythonOperator directly.

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

MatrixManAtYrService avatar Aug 05 '22 20:08 MatrixManAtYrService