fix: sqa deprecations for airflow task cmd
related: #28723
fix deprecations for SQLAlchemy 2.0 for Airflow core task command.
SQLAlchemy 2.0 is changing the behavior when an object is being merged into a Session along the backref cascade. Until SQLAlchemy 1.4 and assuming a bidirectional relationship between a TaskInstance and a DagRun, if a DagRun object is already in a Session the TaskInstance object gets put into the Session as well. This behavior is deprecated for removal in SQLAlchemy 2.0.
In order to mentain the actual behavior and to fix the warning, we need to ensure that both objects are either not in the session or are in the session when they are associated with each other. See here for more information.
Reported in core
- [x] airflow/cli/commands/task_command.py:202
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst or {issue_number}.significant.rst, in newsfragments.
is there any way test the same?
Yeah it would be great to have a test for it.
A good test should test the current behavior in SQLAlchemy 1.4 against 2.0.
I guess locally one could install SQLAlchemy 2.0 and run it against. Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and run all tests ?
is there any way test the same?
is there any way test the same?
Yeah it would be great to have a test for it.
A good test should test the current behavior in SQLAlchemy 1.4 against 2.0.
I guess locally one could install SQLAlchemy 2.0 and run it against. Perhaps we could also add a new CI workflow that upgrades to SQLAlchemy 2.0 and run all tests ?
I tried to make all tests run with SQLAlchemy 2.0 but it is a lot of effort to adjust and fix everything (to make Airflow core compatible with SQLAlchemy 2.0).
Instead I created a small test setup and verified that the fix indeed preserve the current behavior.
SQLAlchemy 1.4
from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship
Base = declarative_base()
class TaskInstance(Base):
__tablename__ = "task_instance"
task_id = Column(String(50), primary_key=True, nullable=False)
dag_id = Column(String(50), primary_key=True, nullable=False)
run_id = Column(String(50), primary_key=True, nullable=False)
map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))
__table_args__ = (
ForeignKeyConstraint(
[dag_id, run_id],
["dag_run.dag_id", "dag_run.run_id"],
name="task_instance_dag_run_fkey",
ondelete="CASCADE",
),
)
dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)
class DagRun(Base):
__tablename__ = "dag_run"
id = Column(Integer, primary_key=True)
dag_id = Column(String(50), nullable=False)
run_id = Column(String(50), nullable=False)
task_instances = relationship(
TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
)
__table_args__ = (
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
)
engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
with Session(engine) as session:
dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
session.add(dag_run)
session.commit()
# Simulate current behavior
with Session(engine) as session:
dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
print(dag_run)
print("Dag run in session:", dag_run in session)
ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
print("TaskInstance in session:", ti in session)
session.commit()
# Check if task instance is in db
with Session(engine) as session:
all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
print(all_tis)
SQLAlchemy 1.4 output
vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_1_4.py
<__main__.DagRun object at 0x7f68a696c0d0>
Dag run in session: True
/workspaces/sqa2backref/backpop/sqa_1_4.py:63: RemovedIn20Warning: Deprecated API features detected! These feature(s) are not compatible with SQLAlchemy 2.0. To prevent incompatible upgrades prior to updating applications, ensure requirements files are pinned to "sqlalchemy<2.0". Set environment variable SQLALCHEMY_WARN_20=1 to show all deprecation warnings. Set environment variable SQLALCHEMY_SILENCE_UBER_WARNING=1 to silence this message. (Background on SQLAlchemy 2.0 at: https://sqlalche.me/e/b8d9)
ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
TaskInstance in session: True
[<__main__.TaskInstance object at 0x7f68a6992c90>
SQLAlchemy 2.0
from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship
Base = declarative_base()
class TaskInstance(Base):
__tablename__ = "task_instance"
task_id = Column(String(50), primary_key=True, nullable=False)
dag_id = Column(String(50), primary_key=True, nullable=False)
run_id = Column(String(50), primary_key=True, nullable=False)
map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))
__table_args__ = (
ForeignKeyConstraint(
[dag_id, run_id],
["dag_run.dag_id", "dag_run.run_id"],
name="task_instance_dag_run_fkey",
ondelete="CASCADE",
),
)
dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)
class DagRun(Base):
__tablename__ = "dag_run"
id = Column(Integer, primary_key=True)
dag_id = Column(String(50), nullable=False)
run_id = Column(String(50), nullable=False)
task_instances = relationship(
TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
)
__table_args__ = (
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
)
engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
with Session(engine) as session:
dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
session.add(dag_run)
session.commit()
# Simulate current behavior
with Session(engine) as session:
dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
print(dag_run)
print("Dag run in session:", dag_run in session)
ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
print("TaskInstance in session:", ti in session)
session.commit()
# Check if task instance is in db
with Session(engine) as session:
all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
print(all_tis)
SQLAlchemy 2.0 output
vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0.py
<__main__.DagRun object at 0x7f24bad1f850>
Dag run in session: True
TaskInstance in session: False
[]
SQLAlchemy 2.0 with fix
from sqlalchemy import Column, Integer, String, create_engine, text, ForeignKeyConstraint, UniqueConstraint, select
from sqlalchemy.orm import Session, declarative_base, relationship
Base = declarative_base()
class TaskInstance(Base):
__tablename__ = "task_instance"
task_id = Column(String(50), primary_key=True, nullable=False)
dag_id = Column(String(50), primary_key=True, nullable=False)
run_id = Column(String(50), primary_key=True, nullable=False)
map_index = Column(Integer, primary_key=True, nullable=False, server_default=text("-1"))
__table_args__ = (
ForeignKeyConstraint(
[dag_id, run_id],
["dag_run.dag_id", "dag_run.run_id"],
name="task_instance_dag_run_fkey",
ondelete="CASCADE",
),
)
dag_run = relationship("DagRun", back_populates="task_instances", lazy="joined", innerjoin=True)
class DagRun(Base):
__tablename__ = "dag_run"
id = Column(Integer, primary_key=True)
dag_id = Column(String(50), nullable=False)
run_id = Column(String(50), nullable=False)
task_instances = relationship(
TaskInstance, back_populates="dag_run", cascade="save-update, merge, delete, delete-orphan"
)
__table_args__ = (
UniqueConstraint("dag_id", "run_id", name="dag_run_dag_id_run_id_key"),
)
engine = create_engine("sqlite://", echo=False, future=True)
Base.metadata.drop_all(engine)
Base.metadata.create_all(engine)
with Session(engine) as session:
dag_run = DagRun(id=1, dag_id="dag_1", run_id="id_1")
session.add(dag_run)
session.commit()
# Simulate current behavior
with Session(engine) as session:
dag_run = session.scalars(select(DagRun).where(DagRun.id == 1)).one()
print(dag_run)
print("Dag run in session:", dag_run in session)
ti = TaskInstance(task_id="ti_1", dag_id="dag_1", dag_run=dag_run)
print("TaskInstance in session:", ti in session)
session.add(ti) # <-- fix
session.commit()
# Check if task instance is in db
with Session(engine) as session:
all_tis = session.scalars(select(TaskInstance).where(TaskInstance.dag_id == "dag_1")).all()
print(all_tis)
SQLAlchemy 2.0 with fix output
vscode ➜ /workspaces/sqa2backref/backpop $ python sqa_2_0_fix.py
<__main__.DagRun object at 0x7efc93103e10>
Dag run in session: True
TaskInstance in session: False
[<__main__.TaskInstance object at 0x7efc937081d0>