Make _run_raw_task AIP-44 compatible
Implement core functionality in _run_raw_task for AIP-44 / database isolation.
Followed by other PRs from https://github.com/apache/airflow/pull/37851.
Generally it's what I would expect - let's get the test pass and merge the depending PR and we can review that one in detail
checks passed @potiuk
I am moatly out till Friday and this one needs quite a bit more review - if it can wait.
I am moatly out till Friday and this one needs quite a bit more review - if it can wait.
no worries, enjoy
I reviewed it quite a bit more thoroughly. I think it's the right direction, but we should - I think - complete it (i.e. turn handle_reschedule into internal_api, merge commiting changes to defer task and avoid keeping duplicated _run_raw_task for DB/DB_isolation mode (unless there is a good reason for now to keep it, but can't think of any).
converting to draft while i work through a few more aspects of this.
ok @potiuk @vincbeck @uranusjr -- i think this is ready for another look.
This "base" PR is the first 4 commits from the mothership PR (https://github.com/apache/airflow/pull/37851)
After this main PR there will be 5 other PRs (each of the remaining commits in #37851 will be a distinct PR) that will follow to complete the job, by which I mean being able to run a task entirely without db connectivity, including mapped tasks, xcom, and async.
Using the mothership branch I have done live testing to ensure that these commits work properly for the other changes which follow, and which can be reviewed and merged separately.
I'll also mention one more thing -- in this PR there are 4 distinct commits each covering a different part of this, and it may be helpful to review them one at a time
@dstandish - I am currently at PyCon US, and I have a busy plan of attending talks and meeting people here :) - > I plan to take a very serious look at the AIP-44 related changes right after I come back (next week Wednesday).
If it helps let me know then I could make a full test as I am failing exactly on this point maight be easy. Otherwise I assume some more pytests are missing?
can you clarify what you mean here?
can you clarify what you mean here?
Uuups, too many typos. I wanted to say: (1) If in any means besides other reviewers it helps to support testing, let me know. Just did a reeading of code w/o execution (2) I saw mainly functional code changing but no pytests. I was wondering if the PR is ready to be merged (by intent refactoring not affecting test results) or if tests are added before merge.
Thanks yeah you mean like help with practical testing like trying things out. I’ve done a lot of it so I feel ok about it so not necessary unless you want to. If you do I recommend checking out the “mothership” pr branch. I think I referenced it somewhere. There you can run this with helm in full isolation mode with dedicated rpc server.
Re unit tests, I think with many of these aip44 changes it’s meant to be a no behavior change refactor so I think that’s why we haven’t historically added many tests. But I can add if you have specific areas of concern.
Re the naming suggestions you made above, I sort of stuck with the pattern that was established before me, which was just to keep the same name as the method. I don’t personally think it is likely to create confusion, since they are namespaced by the module anyway. We could make this more explicit in the rpc module by importing modules and not functions, thereby making things more explicit, but maybe I’d leave that for another pr. Note so that in all cases the functions are private which makes the choice much less consequential as not user facing and changeable at any time. Incidentally I feel like we maybe should find a way so we don’t have to explicitly add those imports in the rpc module. Seems it shouldn’t be necessary given that we already decorate the func. But again that’s a different pr.
Okay, did some testing with a breeze setup, manually "massaged" the ENV to enable AIP-44, strated internal API server, took away the DB connection and set the internal API endpoints for the worker. Using CeleryExecutor got the following results - based on the "mothership" PR - don't know if this helps in review.
Based on example DAGs:
- Worker had problems parsing SubdagOperator, was not able to copy text from tmux, error stack is - failed in DB select of Pool object.. missing in internal API or will Subdag just not be supported?
- DAG example_python_operator - worked / executed w/o error
- DAG example_python_decorator - worked / executed w/o error
- DAG example_branch_operator - failed in task branching - logs below - Seems there is still some DB access for branching
- DAG example_dynamic_task_mapping - failed in task sum_if - logs below - Seems to be serialized values are wrong type?
- DAG tutorial - failed in task templated - logs below - Seems to be an issue with templating
- DAG Params Trigger UI - failed in Select languages (as well as othrs) - logs below - Seems to be problems with serialized parameters
Logs for 4
(...)
[2024-05-20, 18:03:32 CEST] {baseoperator.py:404} WARNING - BranchPythonOperator.execute cannot be called outside TaskInstance!
[2024-05-20, 18:03:32 CEST] {python.py:240} INFO - Done. Returned value was: branch_a
[2024-05-20, 18:03:32 CEST] {branch.py:36} INFO - Branch into branch_a
[2024-05-20, 18:03:32 CEST] {skipmixin.py:178} INFO - Following branch branch_a
[2024-05-20, 18:03:32 CEST] {taskinstance.py:742} ▼ Post task execution logs
[2024-05-20, 18:03:32 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 108 for task branching (DagRunPydantic.get_task_instance() missing 1 required positional argument: 'session'; 7490)
Traceback (most recent call last):
File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork
ret = args.func(args, dag=self.dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method
return _run_raw_task(args, ti)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task
return ti._run_raw_task(
^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task
return _run_raw_task(
^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/opt/airflow/airflow/models/taskinstance.py", line 2984, in _execute_task_with_callbacks
result = self._execute_task(context, task_orig)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 244, in _execute_task
return _execute_task(task_instance=self, context=context, task_orig=task_orig)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/taskinstance.py", line 766, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/taskinstance.py", line 729, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/operators/python.py", line 273, in execute
return self.do_branch(context, super().execute(context))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/operators/branch.py", line 37, in do_branch
self.skip_all_except(context["ti"], branches_to_execute)
File "/opt/airflow/airflow/models/skipmixin.py", line 241, in skip_all_except
if (downstream_ti := dag_run.get_task_instance(t.task_id, map_index=ti.map_index))
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
TypeError: DagRunPydantic.get_task_instance() missing 1 required positional argument: 'session'
[2024-05-20, 18:03:32 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1
Logs for 5
(...)
[2024-05-20, 17:42:51 CEST] {taskinstance.py:742} ▼ Post task execution logs
[2024-05-20, 17:42:51 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 82 for task sum_it (unsupported operand type(s) for +: 'int' and 'str'; 4346)
Traceback (most recent call last):
File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork
ret = args.func(args, dag=self.dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method
return _run_raw_task(args, ti)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task
return ti._run_raw_task(
^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task
return _run_raw_task(
^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/opt/airflow/airflow/models/taskinstance.py", line 2984, in _execute_task_with_callbacks
result = self._execute_task(context, task_orig)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 244, in _execute_task
return _execute_task(task_instance=self, context=context, task_orig=task_orig)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/taskinstance.py", line 766, in _execute_task
result = _execute_callable(context=context, **execute_callable_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/taskinstance.py", line 729, in _execute_callable
return ExecutionCallableRunner(
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/decorators/base.py", line 265, in execute
return_value = super().execute(context)
^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/baseoperator.py", line 405, in wrapper
return func(self, *args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/operators/python.py", line 238, in execute
return_value = self.execute_callable()
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/operators/python.py", line 256, in execute_callable
return runner.run(*self.op_args, **self.op_kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/operator_helpers.py", line 250, in run
return self.func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/example_dags/example_dynamic_task_mapping.py", line 35, in sum_it
total = sum(values)
^^^^^^^^^^^
TypeError: unsupported operand type(s) for +: 'int' and 'str'
[2024-05-20, 17:42:52 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1
Logs for 6
(...)
[2024-05-20, 17:28:53 CEST] {standard_task_runner.py:93} INFO - Job 33: Subtask templated
[2024-05-20, 17:28:53 CEST] {task_command.py:462} INFO - Running task_id='templated' dag_id='tutorial' run_id='manual__2024-05-20T15:23:39.841642+00:00' map_index=-1 start_date=datetime.datetime(2024, 5, 20, 15, 28, 53, 331391, tzinfo=TzInfo(UTC)) end_date=None execution_date=datetime.datetime(2024, 5, 20, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)) duration=0.372874 state='running' try_number=2 max_tries=1 hostname='65a1e8ad69ac' unixname='root' job_id=33 pool='default_pool' pool_slots=1 queue='default' priority_weight=1 operator='BashOperator' custom_operator_name=None queued_dttm=datetime.datetime(2024, 5, 20, 15, 28, 50, 223477, tzinfo=TzInfo(UTC)) queued_by_job_id=23 pid=None executor=None executor_config={} updated_at=datetime.datetime(2024, 5, 20, 15, 28, 53, 351237, tzinfo=TzInfo(UTC)) rendered_map_index=None external_executor_id='d5da9e9f-715a-48d6-a34c-0eeeb5f3d74f' trigger_id=None trigger_timeout=None next_method=None next_kwargs=None run_as_user=None task=<Task(BashOperator): templated> test_mode=False dag_run=DagRunPydantic(id=9, dag_id='tutorial', queued_at=datetime.datetime(2024, 5, 20, 15, 23, 39, 884965, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 5, 20, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 5, 20, 15, 23, 40, 785817, tzinfo=TzInfo(UTC)), end_date=None, state='running', run_id='manual__2024-05-20T15:23:39.841642+00:00', creating_job_id=None, external_trigger=True, run_type='manual', conf={}, data_interval_start=datetime.datetime(2024, 5, 19, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 5, 20, 15, 23, 39, 841642, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 5, 20, 15, 28, 53, 73827, tzinfo=TzInfo(UTC)), dag_hash='4e598e3c2b0dd862a24b86c57cc9387b', updated_at=datetime.datetime(2024, 5, 20, 15, 28, 53, 79029, tzinfo=TzInfo(UTC)), dag=None, consumed_dataset_events=[], log_template_id=2) dag_model=DagModelPydantic(dag_id='tutorial', root_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, is_active=True, last_parsed_time=datetime.datetime(2024, 5, 20, 15, 28, 43, 870484, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, scheduler_lock=None, pickle_id=None, fileloc='/opt/airflow/airflow/example_dags/tutorial.py', processor_subdir='/files/dags', owners='airflow', description='A simple tutorial DAG', default_view='grid', schedule_interval=datetime.timedelta(days=1), timetable_description='', tags=[DagTagPydantic(name='example', dag_id='tutorial')], dag_owner_links=[], parent_dag=None, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, has_task_concurrency_limits=False, has_import_errors=False) raw=True is_trigger_log_context=False on host 65a1e8ad69ac
[2024-05-20, 17:28:53 CEST] {warnings.py:110} WARNING - /usr/local/lib/python3.12/site-packages/pydantic/main.py:347: UserWarning: Pydantic serializer warnings:
Expected `int` but got `str` - serialized value may not be as expected
return self.__pydantic_serializer__.to_python(
[2024-05-20, 17:28:53 CEST] {abstractoperator.py:741} ERROR - Exception rendering Jinja template for task 'templated', field 'bash_command'. Template: '\n{% for i in range(5) %}\n echo "{{ ds }}"\n echo "{{ macros.ds_add(ds, 7)}}"\n{% endfor %}\n'
Traceback (most recent call last):
File "/opt/airflow/airflow/models/abstractoperator.py", line 733, in _do_render_template_fields
rendered_content = self.render_template(
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/template/templater.py", line 169, in render_template
return self._render(template, context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/abstractoperator.py", line 691, in _render
return super()._render(template, context, dag=dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/template/templater.py", line 126, in _render
return render_template_to_string(template, context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/helpers.py", line 289, in render_template_to_string
return render_template(template, cast(MutableMapping[str, Any], context), native=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/helpers.py", line 284, in render_template
return "".join(nodes)
^^^^^^^^^^^^^^
File "<template>", line 21, in root
File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 392, in call
if not __self.is_safe_callable(__obj):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 276, in is_safe_callable
getattr(obj, "unsafe_callable", False) or getattr(obj, "alters_data", False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 864, in __getattr__
return self._fail_with_undefined_error()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 857, in _fail_with_undefined_error
raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'str object' has no attribute 'ds_add'
[2024-05-20, 17:28:53 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 33 for task templated ('str object' has no attribute 'ds_add'; 2098)
Traceback (most recent call last):
File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork
ret = args.func(args, dag=self.dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method
return _run_raw_task(args, ti)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task
return ti._run_raw_task(
^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task
return _run_raw_task(
^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/opt/airflow/airflow/models/taskinstance.py", line 2940, in _execute_task_with_callbacks
task_orig = self.render_templates(context=context, jinja_env=jinja_env)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 161, in render_templates
return TaskInstance.render_templates(self=self, context=context, jinja_env=jinja_env) # type: ignore[arg-type]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/taskinstance.py", line 3354, in render_templates
original_task.render_template_fields(context, jinja_env)
File "/opt/airflow/airflow/models/baseoperator.py", line 1375, in render_template_fields
self._do_render_template_fields(self, self.template_fields, context, jinja_env, set())
File "/opt/airflow/airflow/models/abstractoperator.py", line 733, in _do_render_template_fields
rendered_content = self.render_template(
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/template/templater.py", line 169, in render_template
return self._render(template, context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/abstractoperator.py", line 691, in _render
return super()._render(template, context, dag=dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/template/templater.py", line 126, in _render
return render_template_to_string(template, context)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/helpers.py", line 289, in render_template_to_string
return render_template(template, cast(MutableMapping[str, Any], context), native=False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/helpers.py", line 284, in render_template
return "".join(nodes)
^^^^^^^^^^^^^^
File "<template>", line 21, in root
File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 392, in call
if not __self.is_safe_callable(__obj):
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/jinja2/sandbox.py", line 276, in is_safe_callable
getattr(obj, "unsafe_callable", False) or getattr(obj, "alters_data", False)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 864, in __getattr__
return self._fail_with_undefined_error()
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/jinja2/runtime.py", line 857, in _fail_with_undefined_error
raise self._undefined_exception(self._undefined_message)
jinja2.exceptions.UndefinedError: 'str object' has no attribute 'ds_add'
[2024-05-20, 17:28:53 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1
Logs for 7:
(...)
[2024-05-20, 17:21:43 CEST] {task_command.py:462} INFO - Running task_id='select_languages' dag_id='example_params_trigger_ui' run_id='manual__2024-05-20T17:21:36+02:00' map_index=-1 start_date=datetime.datetime(2024, 5, 20, 15, 21, 43, 236734, tzinfo=TzInfo(UTC)) end_date=None execution_date=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)) duration=None state='running' try_number=1 max_tries=0 hostname='65a1e8ad69ac' unixname='root' job_id=24 pool='default_pool' pool_slots=1 queue='default' priority_weight=5 operator='_BranchPythonDecoratedOperator' custom_operator_name='@task.branch' queued_dttm=datetime.datetime(2024, 5, 20, 15, 21, 39, 246806, tzinfo=TzInfo(UTC)) queued_by_job_id=23 pid=None executor=None executor_config={} updated_at=datetime.datetime(2024, 5, 20, 15, 21, 43, 255403, tzinfo=TzInfo(UTC)) rendered_map_index=None external_executor_id='d04a166e-5f47-44cc-884c-3178e7f0e7bd' trigger_id=None trigger_timeout=None next_method=None next_kwargs=None run_as_user=None task=<Task(_BranchPythonDecoratedOperator): select_languages> test_mode=False dag_run=DagRunPydantic(id=8, dag_id='example_params_trigger_ui', queued_at=datetime.datetime(2024, 5, 20, 15, 21, 38, 752539, tzinfo=TzInfo(UTC)), execution_date=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), start_date=datetime.datetime(2024, 5, 20, 15, 21, 39, 180598, tzinfo=TzInfo(UTC)), end_date=None, state='running', run_id='manual__2024-05-20T17:21:36+02:00', creating_job_id=None, external_trigger=True, run_type='manual', conf={'names': ['Linda', 'Martha', 'Thomas'], 'english': True, 'german': True, 'french': True}, data_interval_start=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), data_interval_end=datetime.datetime(2024, 5, 20, 15, 21, 36, tzinfo=TzInfo(UTC)), last_scheduling_decision=datetime.datetime(2024, 5, 20, 15, 21, 42, 720624, tzinfo=TzInfo(UTC)), dag_hash='ac1f36ca0393ba5bbc744485395bc36c', updated_at=datetime.datetime(2024, 5, 20, 15, 21, 42, 737459, tzinfo=TzInfo(UTC)), dag=None, consumed_dataset_events=[], log_template_id=2) dag_model=DagModelPydantic(dag_id='example_params_trigger_ui', root_dag_id=None, is_paused_at_creation=True, is_paused=False, is_subdag=False, is_active=True, last_parsed_time=datetime.datetime(2024, 5, 20, 15, 21, 27, 370307, tzinfo=TzInfo(UTC)), last_pickled=None, last_expired=None, scheduler_lock=None, pickle_id=None, fileloc='/opt/airflow/airflow/example_dags/example_params_trigger_ui.py', processor_subdir='/files/dags', owners='airflow', description='Example DAG demonstrating the usage DAG params to model a trigger UI with a user form', default_view='grid', schedule_interval=None, timetable_description='Never, external triggers only', tags=[DagTagPydantic(name='params', dag_id='example_params_trigger_ui'), DagTagPydantic(name='example', dag_id='example_params_trigger_ui')], dag_owner_links=[], parent_dag=None, max_active_tasks=16, max_active_runs=16, max_consecutive_failed_dag_runs=0, has_task_concurrency_limits=False, has_import_errors=False) raw=True is_trigger_log_context=False on host 65a1e8ad69ac
[2024-05-20, 17:21:43 CEST] {warnings.py:110} WARNING - /usr/local/lib/python3.12/site-packages/pydantic/main.py:347: UserWarning: Pydantic serializer warnings:
Expected `int` but got `str` - serialized value may not be as expected
return self.__pydantic_serializer__.to_python(
[2024-05-20, 17:21:43 CEST] {standard_task_runner.py:112} ERROR - Failed to execute job 24 for task select_languages (Error calling function `serialize_operator`: ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>; 1015)
Traceback (most recent call last):
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 62, in serialize_operator
return BaseSerialization.serialize(x, use_pydantic_models=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 557, in serialize
return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1015, in serialize_operator
return cls._serialize_node(op, include_deps=op.deps is not BaseOperator.deps)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1066, in _serialize_node
serialize_op["params"] = cls._serialize_params_dict(op.params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 852, in _serialize_params_dict
raise ValueError(
ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow/airflow/models/taskinstance.py", line 278, in _run_raw_task
TaskInstance._execute_task_with_callbacks(
File "/opt/airflow/airflow/models/taskinstance.py", line 2948, in _execute_task_with_callbacks
_update_rtif(ti=self, rendered_fields=rendered_fields)
File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 149, in wrapper
args_dict = BaseSerialization.serialize(arguments_dict, use_pydantic_models=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 529, in serialize
str(k): cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 647, in serialize
mod = _pydantic_model_dump(pyd_mod, var)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 643, in _pydantic_model_dump
return model_cls.model_validate(var).model_dump(mode="json") # type: ignore[attr-defined]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump
return self.__pydantic_serializer__.to_python(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `serialize_operator`: ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>
During handling of the above exception, another exception occurred:
Traceback (most recent call last):
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 62, in serialize_operator
return BaseSerialization.serialize(x, use_pydantic_models=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 557, in serialize
return cls._encode(SerializedBaseOperator.serialize_operator(var), type_=DAT.OP)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1015, in serialize_operator
return cls._serialize_node(op, include_deps=op.deps is not BaseOperator.deps)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 1066, in _serialize_node
serialize_op["params"] = cls._serialize_params_dict(op.params)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 852, in _serialize_params_dict
raise ValueError(
ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>
The above exception was the direct cause of the following exception:
Traceback (most recent call last):
File "/opt/airflow/airflow/task/task_runner/standard_task_runner.py", line 105, in _start_by_fork
ret = args.func(args, dag=self.dag)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/cli_config.py", line 49, in command
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/utils/cli.py", line 115, in wrapper
return f(*args, **kwargs)
^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 477, in task_run
task_return_code = _run_task_by_selected_method(args, _dag, ti)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 254, in _run_task_by_selected_method
return _run_raw_task(args, ti)
^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/cli/commands/task_command.py", line 336, in _run_raw_task
return ti._run_raw_task(
^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 147, in _run_raw_task
return _run_raw_task(
^^^^^^^^^^^^^^
File "/opt/airflow/airflow/models/taskinstance.py", line 341, in _run_raw_task
ti.handle_failure(e, test_mode, context, session=session)
File "/opt/airflow/airflow/serialization/pydantic/taskinstance.py", line 333, in handle_failure
_handle_failure(
File "/opt/airflow/airflow/utils/session.py", line 81, in wrapper
return func(*args, **kwargs)
^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/api_internal/internal_api_call.py", line 149, in wrapper
args_dict = BaseSerialization.serialize(arguments_dict, use_pydantic_models=True)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 529, in serialize
str(k): cls.serialize(v, strict=strict, use_pydantic_models=use_pydantic_models)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 647, in serialize
mod = _pydantic_model_dump(pyd_mod, var)
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/opt/airflow/airflow/serialization/serialized_objects.py", line 643, in _pydantic_model_dump
return model_cls.model_validate(var).model_dump(mode="json") # type: ignore[attr-defined]
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
File "/usr/local/lib/python3.12/site-packages/pydantic/main.py", line 347, in model_dump
return self.__pydantic_serializer__.to_python(
^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
pydantic_core._pydantic_core.PydanticSerializationError: Error calling function `serialize_operator`: ValueError: Params to a DAG or a Task can be only of type airflow.models.param.Param, but param 'names' is <class 'list'>
[2024-05-20, 17:21:43 CEST] {local_task_job_runner.py:240} INFO - Task exited with return code 1
thanks @jscheffl -- which branch were you on? there are more changes re mapping on the "mothership" PR (https://github.com/apache/airflow/pull/37851) that are excluded from this PR just for easier review -- that's the branch you should be on for end-to-end testing. i cherry pick from that PR branch to make the smaller prs.
i've been doing little bits at a time and this PR is bigger than i'd like, but it's still not the whole enchalada. this individual PR is still an incremental addition.
thanks @jscheffl -- which branch were you on? there are more changes re mapping on the "mothership" PR (#37851) that are excluded from this PR just for easier review -- that's the branch you should be on for end-to-end testing. i cherry pick from that PR branch to make the smaller prs.
i've been doing little bits at a time and this PR is bigger than i'd like, but it's still not the whole enchalada. this individual PR is still an incremental addition.
Fully acknowledge. As I was trying to get started into PoC for AIP-69 I struggled the same and AIP-44 clearly solves all basic problems that I run into as well. Seems to be this rework is quite complex. Good that you have split up the "mothership" for incremental review.
As proposed by you I tested on the "mothership" PR #37851 with GIT hash c1b78c035bf6e8250a12e1a7a0f83f78fcbeaa4e ("run-a-full-task-with-internal-api"). No rebase/merge with main before tests.
So as you cherry-pick into smaller PRs and leaving complexity manageable... if all is green might still be good to merge the pieces and I assume to make it working then a few other API calls need to be reworked.
missing in internal API or will Subdag just not be supported?
etc
ah yeah those scenarios are good to add to my testing dag. maybe you could share your dag? and maybe i could add them as integration tests with xfail for API. like i said, not everything is implemented in this branch (or in the other branch for that matter) but many things are and it's all a work in progress. pretty soon i am going to have to roll off onto other things and will ticket out remaining known AIP-44 issues for the next person to take over. for example we have not done anything re triggerer yet. so just to be extra clear, this PR is not the final AIP-44 PR. this PR is just "get it working for the simplest cases"
So as you cherry-pick into smaller PRs and leaving complexity manageable... if all is green might still be good to merge the pieces and I assume to make it working then a few other API calls need to be reworked.
Yeah that's sorta how i've been thinking about it. and i think that ultimately we should probably compile a dag with the known important scenarios and set it up as an integration test.
also for reference @jscheffl here's the PRs that came before on this one 😅
https://github.com/apache/airflow/pulls?q=is%3Apr+project%3Aapache%2F169+is%3Aclosed
@potiuk i'll just go ahead and merge this now so i can close out all the PRs waiting on it. if you later want to look i'm happy to take your suggestions and make changes afterwards. thanks