airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Start porting DAG definition code to the Task SDK

Open ashb opened this issue 1 year ago • 3 comments

By "definition code" we mean anything needed at definition/parse time, leaving anything to do with scheduling time decisions in Airflow's core.

Also in this PR I have attempted to keep it to only porting defintiion code for simple DAGs, leaving anything to do with mapped tasks or execution time in core for now, but a few things "leaked" across.

And as the goal of this PR is to go from working state to working state some of the code in Task SDK still imports from "core" (various types, enums or helpers) that will need to be resolved before 3.0 release, but it is fine for now.

I'm also aware that the class hierarchy with airflow.models.baseoperator.BaseOperator (and to a lesser extend with DAG) in particular is very messy right now, and we will need to think how we want to add on the scheduling-time functions etc, as I'm not yet sold that having Core Airflow depend upon the Task-SDK classes/import the code is the right structure, but we can address that later

We will also need to addresses the rendered docs for the Task SDK in a future PR -- the goal is that "anything" exposed on airflow.sdk directly is part of the public API, but right now the renedered docs show DAG as airflow.sdk.definitions.dag.DAG which is certainly not what we want users to see.

closes https://github.com/apache/airflow/issues/43011

ashb avatar Oct 16 '24 12:10 ashb

This is very much at the "boring ground work stage" - once we've got this covered we can add some of the execution changes which is not of the n"execution interface" side where it gets fun

ashb avatar Oct 16 '24 12:10 ashb

@kaxil @jscheffl I've just pushed latest changes to this. Probably not worth looking at it again until I've finished it and have a decent idea that the tests are passing locally, but in the last commit I've just pushed I have removed the old code I've ported over, so it might be slightly clearer where I've just moved code.

ashb avatar Oct 17 '24 20:10 ashb

Current mypy status:

Found 58 errors in 41 files (checked 1135 source files)

Test wise, at least tests/models/test_baseoperator.py and task_sdk/tests/ are passing.

ashb avatar Oct 17 '24 21:10 ashb

Okay, we should be getting there now.

Mypy should be happy, most of tests/models should pass

FAILED tests/models/test_mappedoperator.py::test_task_mapping_default_args - AssertionError: assert equals failed
  'airflow'  'test'
FAILED tests/models/test_mappedoperator.py::test_task_mapping_override_default_args - AssertionError: assert equals failed
  DateTime(2016, 1, 1, 0, 0, 0, tzinfo=Timezone('UTC'))               DateTime(2024, 10, 28, 22, 38, 58, 228687, tzinfo=Timezone('UTC'))
FAILED tests/models/test_mappedoperator.py::test_mapped_task_applies_default_args_classic - assert equals failed
  None                              datetime.timedelta(seconds=1800)
FAILED tests/models/test_renderedtifields.py::TestRenderedTaskInstanceFields::test_pandas_dataframes_works_with_the_string_compare - ModuleNotFoundError: No module named 'pandas'
FAILED tests/models/test_taskinstance.py::TestTaskInstance::test_set_dag - RuntimeError: Operator <Task(EmptyOperator): op_1> has not been assigned to a DAG yet
FAILED tests/models/test_taskinstance.py::TestTaskInstance::test_infer_dag - ValueError: Tried to create relationships between tasks that don't have DAGs yet. Set the DAG for at least one task and try again: [<Task(EmptyOperator): test_op_1>, <Task(EmptyOperator): test_op_2>]
FAILED tests/models/test_taskinstance.py::TestTaskInstance::test_handle_failure - AttributeError: 'NoneType' object has no attribute 'args'
FAILED tests/models/test_taskinstance.py::test_lazy_xcom_access_does_not_pickle_session - RuntimeError: unknown backend None

8 failed, 1258 passed, 3 skipped, 1 warning in 58.45s from running pytest tests/serialization/test_dag_serialization.py tests/models/

ashb avatar Oct 28 '24 22:10 ashb

(Adding the legacy api label)

ashb avatar Oct 29 '24 16:10 ashb

Taking this out of draft now as it is almost there, a few small errors to chase down/correct but otherwise where we want it to be.

(Well that's not true, the change is about 10x what I'd like, but this was about as small as I could think to make it)

ashb avatar Oct 29 '24 18:10 ashb

Some note about the future direction of this: Right now the scheduler is still depending upon the TaskSDK, but that is mostly to make the change as small as we can.

Once we have something like this accepted and merged we will look at making the Serialized DAG etc not depend on the TaskSDK (as the scheduler should be able to operate on a much simpler interface than the full DAG Authoring interface.) We'll need to explore what makes sense, and also how we usefully share enums (as as trigger rules) as it would be nice if the scheduler does not need to depend on the task-sdk

ashb avatar Oct 30 '24 11:10 ashb

There are lots more things to move over to the TaskSDK before AIP-72 will be complete which we will track in the project board, but since this is the big base, and slow to test due to unavoidable wide reaching changes we will merge this and follow up with future changes for the rest (things like Mapped operator, DagParam etc)

ashb avatar Oct 30 '24 17:10 ashb

Merging this, one of the failures around k8s test that took longer than an hour looks like a transient one:

Progress: airflow-python-3.9-v1.28.13   Error during running tests for Python 3.9, Kubernetes v1.28.13
Progress: airflow-python-3.10-v1.29.8   Image: "ghcr.io/apache/airflow/main/prod/python3.10-kubernetes" with ID "sha256:1547fde10197845f0938fd7d0fc5977273a4871e016c8102ce9c8eb33010d064" not [...]

If for some reason it fails on main too, we will debug it

kaxil avatar Oct 30 '24 18:10 kaxil