Start porting DAG definition code to the Task SDK
By "definition code" we mean anything needed at definition/parse time, leaving anything to do with scheduling time decisions in Airflow's core.
Also in this PR I have attempted to keep it to only porting defintiion code for simple DAGs, leaving anything to do with mapped tasks or execution time in core for now, but a few things "leaked" across.
And as the goal of this PR is to go from working state to working state some of the code in Task SDK still imports from "core" (various types, enums or helpers) that will need to be resolved before 3.0 release, but it is fine for now.
I'm also aware that the class hierarchy with airflow.models.baseoperator.BaseOperator (and to a lesser extend with DAG) in particular is very messy right now, and we will need to think how we want to add on the scheduling-time functions etc, as I'm not yet sold that having Core Airflow depend upon the Task-SDK classes/import the code is the right structure, but we can address that later
We will also need to addresses the rendered docs for the Task SDK in a future
PR -- the goal is that "anything" exposed on airflow.sdk directly is part of
the public API, but right now the renedered docs show DAG as
airflow.sdk.definitions.dag.DAG which is certainly not what we want users to
see.
closes https://github.com/apache/airflow/issues/43011
This is very much at the "boring ground work stage" - once we've got this covered we can add some of the execution changes which is not of the n"execution interface" side where it gets fun
@kaxil @jscheffl I've just pushed latest changes to this. Probably not worth looking at it again until I've finished it and have a decent idea that the tests are passing locally, but in the last commit I've just pushed I have removed the old code I've ported over, so it might be slightly clearer where I've just moved code.
Current mypy status:
Found 58 errors in 41 files (checked 1135 source files)
Test wise, at least tests/models/test_baseoperator.py and task_sdk/tests/ are passing.
Okay, we should be getting there now.
Mypy should be happy, most of tests/models should pass
FAILED tests/models/test_mappedoperator.py::test_task_mapping_default_args - AssertionError: assert equals failed
'airflow' 'test'
FAILED tests/models/test_mappedoperator.py::test_task_mapping_override_default_args - AssertionError: assert equals failed
DateTime(2016, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC')) DateTime(2024, 10, 28, 22, 38, 58, 228687, tzinfo=Timezone('UTC'))
FAILED tests/models/test_mappedoperator.py::test_mapped_task_applies_default_args_classic - assert equals failed
None datetime.timedelta(seconds=1800)
FAILED tests/models/test_renderedtifields.py::TestRenderedTaskInstanceFields::test_pandas_dataframes_works_with_the_string_compare - ModuleNotFoundError: No module named 'pandas'
FAILED tests/models/test_taskinstance.py::TestTaskInstance::test_set_dag - RuntimeError: Operator <Task(EmptyOperator): op_1> has not been assigned to a DAG yet
FAILED tests/models/test_taskinstance.py::TestTaskInstance::test_infer_dag - ValueError: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(EmptyOperator): test_op_1>, <Task(EmptyOperator): test_op_2>]
FAILED tests/models/test_taskinstance.py::TestTaskInstance::test_handle_failure - AttributeError: 'NoneType' object has no attribute 'args'
FAILED tests/models/test_taskinstance.py::test_lazy_xcom_access_does_not_pickle_session - RuntimeError: unknown backend None
8 failed, 1258 passed, 3 skipped, 1 warning in 58.45s from running pytest tests/serialization/test_dag_serialization.py tests/models/
(Adding the legacy api label)
Taking this out of draft now as it is almost there, a few small errors to chase down/correct but otherwise where we want it to be.
(Well that's not true, the change is about 10x what I'd like, but this was about as small as I could think to make it)
Some note about the future direction of this: Right now the scheduler is still depending upon the TaskSDK, but that is mostly to make the change as small as we can.
Once we have something like this accepted and merged we will look at making the Serialized DAG etc not depend on the TaskSDK (as the scheduler should be able to operate on a much simpler interface than the full DAG Authoring interface.) We'll need to explore what makes sense, and also how we usefully share enums (as as trigger rules) as it would be nice if the scheduler does not need to depend on the task-sdk
There are lots more things to move over to the TaskSDK before AIP-72 will be complete which we will track in the project board, but since this is the big base, and slow to test due to unavoidable wide reaching changes we will merge this and follow up with future changes for the rest (things like Mapped operator, DagParam etc)
Merging this, one of the failures around k8s test that took longer than an hour looks like a transient one:
Progress: airflow-python-3.9-v1.28.13 Error during running tests for Python 3.9, Kubernetes v1.28.13
Progress: airflow-python-3.10-v1.29.8 Image: "ghcr.io/apache/airflow/main/prod/python3.10-kubernetes" with ID "sha256:1547fde10197845f0938fd7d0fc5977273a4871e016c8102ce9c8eb33010d064" not [...]
If for some reason it fails on main too, we will debug it