airflow
airflow copied to clipboard
"airflow tasks render/state" cli commands do not work for mapped task instances
Apache Airflow version
Other Airflow 2 version
What happened
Running following cli command: """ airflow tasks render test-dynamic-mapping consumer scheduled__2022-09-18T15:14:15.107780+00:00 --map-index """
fails with exception:
Traceback (most recent call last):
File "/opt/python3.8/bin/airflow", line 8, in
Running following cli command: """ airflow tasks state test-dynamic-mapping consumer scheduled__2022-09-18T15:14:15.107780+00:00 --map-index """
fails with exception:
Traceback (most recent call last):
File "/opt/python3.8/bin/airflow", line 8, in
What you think should happen instead
Command successfully executed
How to reproduce
No response
Operating System
Linux
Versions of Apache Airflow Providers
No response
Deployment
Composer
Deployment details
No response
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
-
For mapped tasks render might need to retrieve
template_fieldsfrom task instead oftask.__class__and rendered task needs to be used so that the values are displayed for mapped tasks. -
current_statedoesn't take map_index into account yielding many results for a mapped task. It should also filter byTaskInstance.map_index
https://github.com/apache/airflow/blob/520893a9a13a6ec5611ed4dd76039974eb4c069c/airflow/models/taskinstance.py#L805-L822
Something like below is my first attempt :
diff --git a/airflow/cli/commands/task_command.py b/airflow/cli/commands/task_command.py
index 9caa8bb4bd..2e164400c1 100644
--- a/airflow/cli/commands/task_command.py
+++ b/airflow/cli/commands/task_command.py
@@ -586,13 +586,20 @@ def task_render(args):
task, args.map_index, exec_date_or_run_id=args.execution_date_or_run_id, create_if_necessary="memory"
)
ti.render_templates()
- for attr in task.__class__.template_fields:
+ rendered_task = ti.task
+
+ if task.is_mapped:
+ template_fields = task.template_fields
+ else:
+ template_fields = task.__class__.template_fields
+
+ for attr in template_fields:
print(
textwrap.dedent(
f""" # ----------------------------------------------------------
# property: {attr}
# ----------------------------------------------------------
- {getattr(task, attr)}
+ {getattr(rendered_task, attr)}
"""
)
)
diff --git a/airflow/models/taskinstance.py b/airflow/models/taskinstance.py
index 52562e72ad..582edac081 100644
--- a/airflow/models/taskinstance.py
+++ b/airflow/models/taskinstance.py
@@ -817,6 +817,7 @@ class TaskInstance(Base, LoggingMixin):
TaskInstance.dag_id == self.dag_id,
TaskInstance.task_id == self.task_id,
TaskInstance.run_id == self.run_id,
+ TaskInstance.map_index == self.map_index
)
.scalar()
)
Hi Karthikeyan. That looks good. Do you mind to prepare PR with the changes?
Hi @tirkarthi , how are you getting along with this?
@ephraimbuddy Apologies for late reply. Unfortunately I got held up with work. I remember your patch in one of the PR comments. I hope you or someone can pick this up.
https://github.com/apache/airflow/pull/26750#discussion_r1011799699
Thanks @ephraimbuddy