airflow icon indicating copy to clipboard operation
airflow copied to clipboard

fix: sqa deprecations for airflow task cmd

Open dondaum opened this issue 1 year ago • 2 comments

related: #28723

fix deprecations for SQLAlchemy 2.0 for Airflow core task command.

SQLAlchemy 2.0 is changing the behavior when an object is being merged into a Session along the backref cascade. Until SQLAlchemy 1.4 and assuming a bidirectional relationship between a TaskInstance and a DagRun, if a DagRun object is already in a Session the TaskInstance object gets put into the Session as well. This behavior is deprecated for removal in SQLAlchemy 2.0.

In order to mentain the actual behavior and to fix the warning, we need to ensure that both objects are either not in the session or are in the session when they are associated with each other. See here for more information.

Reported in core

  • [x] airflow/cli/commands/task_command.py:202

^ Add meaningful description above Read the Pull Request Guidelines for more information. In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed. In case of a new dependency, check compliance with the ASF 3rd Party License Policy. In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.

dondaum avatar Apr 24 '24 21:04 dondaum

is there any way test the same?

Yeah it would be great to have a test for it.

A good test should test the current behavior in SQLAlchemy 1.4 against 2.0.

I guess locally one could install SQLAlchemy 2.0 and run it against. Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and run all tests ?

dondaum avatar Apr 25 '24 14:04 dondaum

is there any way test the same?

is there any way test the same?

Yeah it would be great to have a test for it.

A good test should test the current behavior in SQLAlchemy 1.4 against 2.0.

I guess locally one could install SQLAlchemy 2.0 and run it against. Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and run all tests ?

I tried to make all tests run with SQLAlchemy 2.0 but it is a lot of effort to adjust and fix everything (to make Airflow core compatible with SQLAlchemy 2.0).

Instead I created a small test setup and verified that the fix indeed preserve the current behavior.

SQLAlchemy 1.4


from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship


Base = declarative_base()



class TaskInstance(Base):
    __tablename__ = "task_instance"

    task_id = Column(String(50), primary_key=True, nullable=False)
    dag_id = Column(String(50), primary_key=True, nullable=False)
    run_id = Column(String(50), primary_key=True, nullable=False)
    map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))

    __table_args__ = (
        ForeignKeyConstraint(
            [dag_id, run_id],
            ["dag_run.dag_id", "dag_run.run_id"],
            name="task_instance_dag_run_fkey",
            ondelete="CASCADE",
        ),
    )

    dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)

class DagRun(Base):
    __tablename__ = "dag_run"

    id = Column(Integer, primary_key=True)
    dag_id = Column(String(50), nullable=False)
    run_id = Column(String(50), nullable=False)
    
    task_instances = relationship(
        TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
    )

    __table_args__ = (
        UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
    )



engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)


with Session(engine) as session:
    dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
    session.add(dag_run)
    session.commit()


# Simulate current behavior
with Session(engine) as session:
    dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
    print(dag_run)
    print("Dag run in session:", dag_run in session)


    ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
    print("TaskInstance in session:", ti in session)

    session.commit()


# Check if task instance is in db
with Session(engine) as session:
    all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
    print(all_tis)

SQLAlchemy 1.4 output

vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_1_4.py 
<__main__.DagRun object at 0x7f68a696c0d0>
Dag run in session: True
/workspaces/sqa2backref/backpop/sqa_1_4.py:63: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings.  Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
  ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
TaskInstance in session: True
[<__main__.TaskInstance object at 0x7f68a6992c90>

SQLAlchemy 2.0


from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship


Base = declarative_base()



class TaskInstance(Base):
    __tablename__ = "task_instance"

    task_id = Column(String(50), primary_key=True, nullable=False)
    dag_id = Column(String(50), primary_key=True, nullable=False)
    run_id = Column(String(50), primary_key=True, nullable=False)
    map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))

    __table_args__ = (
        ForeignKeyConstraint(
            [dag_id, run_id],
            ["dag_run.dag_id", "dag_run.run_id"],
            name="task_instance_dag_run_fkey",
            ondelete="CASCADE",
        ),
    )

    dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)

class DagRun(Base):
    __tablename__ = "dag_run"

    id = Column(Integer, primary_key=True)
    dag_id = Column(String(50), nullable=False)
    run_id = Column(String(50), nullable=False)
    
    task_instances = relationship(
        TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
    )

    __table_args__ = (
        UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
    )



engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)


with Session(engine) as session:
    dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
    session.add(dag_run)
    session.commit()


# Simulate current behavior
with Session(engine) as session:
    dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
    print(dag_run)
    print("Dag run in session:", dag_run in session)


    ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
    print("TaskInstance in session:", ti in session)

    session.commit()


# Check if task instance is in db
with Session(engine) as session:
    all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
    print(all_tis)

SQLAlchemy 2.0 output

vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0.py 
<__main__.DagRun object at 0x7f24bad1f850>
Dag run in session: True
TaskInstance in session: False
[]

SQLAlchemy 2.0 with fix

from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship


Base = declarative_base()



class TaskInstance(Base):
    __tablename__ = "task_instance"

    task_id = Column(String(50), primary_key=True, nullable=False)
    dag_id = Column(String(50), primary_key=True, nullable=False)
    run_id = Column(String(50), primary_key=True, nullable=False)
    map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))

    __table_args__ = (
        ForeignKeyConstraint(
            [dag_id, run_id],
            ["dag_run.dag_id", "dag_run.run_id"],
            name="task_instance_dag_run_fkey",
            ondelete="CASCADE",
        ),
    )

    dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)

class DagRun(Base):
    __tablename__ = "dag_run"

    id = Column(Integer, primary_key=True)
    dag_id = Column(String(50), nullable=False)
    run_id = Column(String(50), nullable=False)
    
    task_instances = relationship(
        TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
    )

    __table_args__ = (
        UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
    )



engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)


with Session(engine) as session:
    dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
    session.add(dag_run)
    session.commit()


# Simulate current behavior
with Session(engine) as session:
    dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
    print(dag_run)
    print("Dag run in session:", dag_run in session)


    ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
    print("TaskInstance in session:", ti in session)

    session.add(ti)  # <-- fix

    session.commit()


# Check if task instance is in db
with Session(engine) as session:
    all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
    print(all_tis)

SQLAlchemy 2.0 with fix output

vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0_fix.py
<__main__.DagRun object at 0x7efc93103e10>
Dag run in session: True
TaskInstance in session: False
[<__main__.TaskInstance object at 0x7efc937081d0>

dondaum avatar Apr 26 '24 09:04 dondaum