airflow icon indicating copy to clipboard operation
airflow copied to clipboard

"airflow tasks render/state" cli commands do not work for mapped task instances

Open kosteev opened this issue 3 years ago • 2 comments

Apache Airflow version

Other Airflow 2 version

What happened

Running following cli command: """ airflow tasks render test-dynamic-mapping consumer scheduled__2022-09-18T15:14:15.107780+00:00 --map-index """

fails with exception: Traceback (most recent call last): File "/opt/python3.8/bin/airflow", line 8, in sys.exit(main()) File "/opt/python3.8/lib/python3.8/site-packages/airflow/main.py", line 38, in main args.func(args) File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 51, in command return func(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 101, in wrapper return f(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 337, in _wrapper f(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 576, in task_render for attr in task.class.template_fields: TypeError: 'member_descriptor' object is not iterable

Running following cli command: """ airflow tasks state test-dynamic-mapping consumer scheduled__2022-09-18T15:14:15.107780+00:00 --map-index """

fails with exception: Traceback (most recent call last): File "/opt/python3.8/bin/airflow", line 8, in sys.exit(main()) File "/opt/python3.8/lib/python3.8/site-packages/airflow/main.py", line 38, in main args.func(args) File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/cli_parser.py", line 51, in command return func(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 101, in wrapper return f(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/cli.py", line 337, in _wrapper f(*args, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/cli/commands/task_command.py", line 422, in task_state print(ti.current_state()) File "/opt/python3.8/lib/python3.8/site-packages/airflow/utils/session.py", line 71, in wrapper return func(*args, session=session, **kwargs) File "/opt/python3.8/lib/python3.8/site-packages/airflow/models/taskinstance.py", line 849, in current_state session.query(TaskInstance.state) File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 2879, in scalar ret = self.one() File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/orm/query.py", line 2856, in one return self._iter().one() File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/result.py", line 1190, in one return self._only_one_row( File "/opt/python3.8/lib/python3.8/site-packages/sqlalchemy/engine/result.py", line 613, in _only_one_row raise exc.MultipleResultsFound( sqlalchemy.exc.MultipleResultsFound: Multiple rows were found when exactly one was required

What you think should happen instead

Command successfully executed

How to reproduce

No response

Operating System

Linux

Versions of Apache Airflow Providers

No response

Deployment

Composer

Deployment details

No response

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

kosteev avatar Sep 21 '22 13:09 kosteev

  1. For mapped tasks render might need to retrieve template_fields from task instead of task.__class__ and rendered task needs to be used so that the values are displayed for mapped tasks.

  2. current_state doesn't take map_index into account yielding many results for a mapped task. It should also filter by TaskInstance.map_index

https://github.com/apache/airflow/blob/520893a9a13a6ec5611ed4dd76039974eb4c069c/airflow/models/taskinstance.py#L805-L822

Something like below is my first attempt :

diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 9caa8bb4bd..2e164400c1 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -586,13 +586,20 @@ def task_render(args):
         task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="memory"
     )
     ti.render_templates()
-    for attr in task.__class__.template_fields:
+    rendered_task = ti.task
+
+    if task.is_mapped:
+        template_fields = task.template_fields
+    else:
+        template_fields = task.__class__.template_fields
+
+    for attr in template_fields:
         print(
             textwrap.dedent(
                 f"""        # ----------------------------------------------------------
         # property: {attr}
         # ----------------------------------------------------------
-        {getattr(task, attr)}
+        {getattr(rendered_task, attr)}
         """
             )
         )
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 52562e72ad..582edac081 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -817,6 +817,7 @@ class TaskInstance(Base, LoggingMixin):
                 TaskInstance.dag_id == self.dag_id,
                 TaskInstance.task_id == self.task_id,
                 TaskInstance.run_id == self.run_id,
+                TaskInstance.map_index == self.map_index
             )
             .scalar()
         )

tirkarthi avatar Sep 21 '22 16:09 tirkarthi

Hi Karthikeyan. That looks good. Do you mind to prepare PR with the changes?

kosteev avatar Sep 21 '22 19:09 kosteev

Hi @tirkarthi , how are you getting along with this?

ephraimbuddy avatar Dec 19 '22 10:12 ephraimbuddy

@ephraimbuddy Apologies for late reply. Unfortunately I got held up with work. I remember your patch in one of the PR comments. I hope you or someone can pick this up.

https://github.com/apache/airflow/pull/26750#discussion_r1011799699

tirkarthi avatar Dec 30 '22 13:12 tirkarthi

Thanks @ephraimbuddy

tirkarthi avatar Jan 07 '23 05:01 tirkarthi