airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Tasks with 'none_failed_min_one_success' trigger_rule skipping before Dynamic Task Group is fully expanded

Open gavinhonl opened this issue 1 year ago • 15 comments

Apache Airflow version

Other Airflow 2 version (please specify below)

If "Other Airflow 2 version" selected, which one?

2.8.3 and 2.9.1

What happened?

If a DAG is run for the first time, a task with a 'none_failed_min_one_success' trigger rule is almost immediately skipped before the dependant upstream tasks are complete resulting in: image

If I clear the same DAG run, with the process_items task group fully expanded, the DAG completes as expected: image

What you think should happen instead?

If the a_end task has a 'none_failed_min_one_success' trigger rule it should only be run when its upstream dependant tasks are complete and not in a failed or upstream_failed state and at least one upstream task has succeeded.

How to reproduce

DAG Code:

from datetime import datetime, timedelta

from airflow.decorators import task, task_group
from airflow import DAG, AirflowException
from airflow.operators.empty import EmptyOperator

default_args = {
    'owner': 'insight_techops',
    'depends_on_past': False,
    'start_date': datetime(2019, 1, 1),
    'provide_context': True,
    'retries': 0,
    'retry_delay': timedelta(seconds=30)
}

@task
def parse_csv_schedule():
    items_dict = {'A': '1', 'B': '2', 'C': '3', 'D': '4'}
    return items_dict

@task_group(group_id="process_items")
def process_items(items_dict: dict):
    @task
    def retrieve_item_metadata(items: dict):
        media_asset = items[0]
        print(media_asset)
        return media_asset

    @task_group(group_id="a_process")
    def a_process():
        @task.branch(retries=0)
        def a_start():
            a_handling = 'none'
            if a_handling == 'bypass':
                return "process_items.a_process.a_bypass"
            else:
                return "process_items.a_process.a_end"
            
        a_start = a_start()
        a_bypass = EmptyOperator(task_id='a_bypass')
        a_end = EmptyOperator(task_id='a_end', trigger_rule='none_failed_min_one_success')
        a_start >> a_bypass >> a_end
        a_start >> a_end

    @task
    def mark_item_as_done():
        try:
            print(f"Marking item as Done")
        except Exception as error:
            raise AirflowException(error)
    
    item_dict = retrieve_item_metadata(items=items_dict)
    mark_item_as_done = mark_item_as_done()
    a_process = a_process()
    item_dict >> a_process >> mark_item_as_done

with DAG(dag_id='af055_Debugger', default_args=default_args, max_active_runs=1, schedule_interval=None, tags=['sales']):
    end = EmptyOperator(task_id='end')
    items_dict = parse_csv_schedule()
    items_dict >> process_items.expand(items_dict=items_dict) >> end

I'm pretty sure it is related to the 'none_failed_min_one_success' as if I cause an upstream task within the TaskGroup to fail, the task with the trigger rule in question (and subsequent downstream tasks) are skipped: image

Compared to if the trigger rule for the a_end is default: image

Operating System

Mac OS 13.6 and Ubuntu 22.0.4

Versions of Apache Airflow Providers

No response

Deployment

Official Apache Airflow Helm Chart

Deployment details

Dev: Docker Desktop, Helm, Kubernetes

Prod: EKS, Helm

Anything else?

100% reproducible

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

gavinhonl avatar May 24 '24 08:05 gavinhonl

Thanks for opening your first issue here! Be sure to follow the issue template! If you are willing to raise PR to address this issue please do so, no need to wait for approval.

boring-cyborg[bot] avatar May 24 '24 08:05 boring-cyborg[bot]

Here's a slightly simpler reproduction without nested task groups or branching:

from airflow.decorators import dag, task, task_group
from pendulum import datetime


@dag(
    start_date=datetime(2024, 1, 1),
    schedule="@daily",
    catchup=False,
    doc_md=__doc__,
    tags=["example"],
)
def repro():

    @task
    def init():
        return ["seize", "the", "day"]

    @task_group(group_id="tg")
    def tg(message):
        @task
        def imsleepy(message):
            return message

        @task(trigger_rule="none_failed_min_one_success")
        def imawake(message):
            print(message)

        imawake(imsleepy(message))

    tg.expand(message=init())


repro()

RNHTTR avatar May 25 '24 15:05 RNHTTR

Hi, could I take this issue?

mateuslatrova avatar May 31 '24 01:05 mateuslatrova

Please do. It would be great if you can confirm that you are able to reproduce the issue.

gavinhonl avatar May 31 '24 13:05 gavinhonl

Please do. It would be great if you can confirm that you are able to reproduce the issue.

I have just reproduced the issue using @RNHTTR 's example code.

Does anyone have an idea of which parts of the codebase I should take a look first to find this bug?

In the meanwhile I will try to figure it out.

mateuslatrova avatar Jun 01 '24 14:06 mateuslatrova

Please do. It would be great if you can confirm that you are able to reproduce the issue.

I have just reproduced the issue using @RNHTTR 's example code.

Does anyone have an idea of which parts of the codebase I should take a look first to find this bug?

In the meanwhile I will try to figure it out.

Some candidate starting points:

One good idea looking into what's going on would be to review the unit tests for none_failed_min_one_success and see the expected behavior.

RNHTTR avatar Jun 01 '24 15:06 RNHTTR

Please do. It would be great if you can confirm that you are able to reproduce the issue.

I have just reproduced the issue using @RNHTTR 's example code. Does anyone have an idea of which parts of the codebase I should take a look first to find this bug? In the meanwhile I will try to figure it out.

Some candidate starting points:

One good idea looking into what's going on would be to review the unit tests for none_failed_min_one_success and see the expected behavior.

Nice, thank you so much! I will take a look.

mateuslatrova avatar Jun 01 '24 16:06 mateuslatrova

Hello, I am also facing this issue, it will be awesome if someone can find a solution (or at least a workaround).

Best regards and thank you for developing Airflow!

le-chartreux avatar Jun 18 '24 09:06 le-chartreux

Hi @le-chartreux ! I have already found the issue, and I am working on a solution. Soon I will send an explanation here.

mateuslatrova avatar Jun 18 '24 12:06 mateuslatrova

Awesome, thank you @mateuslatrova!

le-chartreux avatar Jun 18 '24 12:06 le-chartreux

@mateuslatrova - Great to hear there's progress with this!

gavinhonl avatar Jun 18 '24 13:06 gavinhonl

Hello Airflow Community,

While waiting for a resolution to the trigger rule issue, I wanted to share a workaround that I've implemented. While it may not address every scenario, it could be beneficial for similar use cases.

Problem Context

In scenarios involving branching, where only one (or a part) of several tasks is executed and its result is needed downstream, the ideal trigger rule for the downstream task that will get the output would be NONE_FAILED_MIN_ONE_SUCCESS.

For example:

branching ─┬─> task_a ─┐
           OR          |─> task_to_get_the_result_of_the_task_that_runned
           └─> task b ─┘

Unfortunately, this trigger rule doesn't work as expected within dynamic task groups, leading to the downstream task being erroneously skipped.

Workaround Strategy

My workaround involves using the NONE_FAILED trigger rule, which does not exhibit the skipping behavior. To emulate the desired NONE_FAILED_MIN_ONE_SUCCESS logic, I've incorporated the use of AirflowSkipException to skip the downstream task when all preceding tasks are skipped (i.e., when all outputs are None).

Below is an example of using NONE_FAILED instead of NONE_FAILED_MIN_ONE_SUCCESS:

from airflow.decorators import dag, task, task_group
from pendulum import datetime
from airflow.utils.trigger_rule import TriggerRule
from airflow.exceptions import AirflowSkipException


@dag(start_date=datetime(2024, 1, 1), schedule=None, catchup=False)
def workaround():
    directions = get_directions()
    get_message_from_direction.expand(direction=directions)


@task
def get_directions() -> list[str]:
    return ["left", "right", "bottom"]


@task_group
def get_message_from_direction(direction: str) -> str | None:
    message_from_left = left_task()
    message_from_right = right_task()
    choose_next_task_from_direction(direction) >> [message_from_left, message_from_right]
    return get_message(message_from_left, message_from_right)


@task.branch
def choose_next_task_from_direction(direction: str) -> str | None:
    this_group = "get_message_from_direction"
    if direction == "left":
        return f"{this_group}.left_task"
    if direction == "right":
        return f"{this_group}.right_task"
    # return None, nothing follows


@task
def left_task():
    return "message from left"


@task
def right_task():
    return "message from right"


# NONE_FAILED_MIN_ONE_SUCCESS nor ONE_SUCCESS are used because they aren't working
# inside dynamic task groups: see https://github.com/apache/airflow/issues/39801
# To workaround, NONE_FAILED is used, but it's not a problem:
# - If no previous task failed and at least one of the previous tasks returns a message,
#   (i.e., at least one succeed), everything is fine and the message is returned.
# - If at least one of the previous tasks failed, this task will not be triggered.
# - If all the previous tasks are skipped, no message will be provided and this task
#   will be skipped with the AirflowSkipException.
@task(trigger_rule=TriggerRule.NONE_FAILED)
def get_message(message_from_left: str | None, message_from_right: str | None) -> str:
    # Please note that the output of a successfully task should not be equivalent to False
    # for the or to work! Else, use something like 'if message_from_left is not None: ...'
    message = message_from_left or message_from_right  
    if message:
        # one succeed
        return message
    raise AirflowSkipException("Both skipped.")


workaround()

The get_message task will effectively mimic the behavior of NONE_FAILED_MIN_ONE_SUCCESS :

  • Retrieve the message from the task that was executed and succeeded.
  • Be skipped if both preceding tasks were skipped.
  • Not be triggered if any preceding task failed.

I hope this workaround proves useful to others facing the same issue. Your feedback and suggestions for improvement are welcome.

Best regards, Nathan Rousseau

le-chartreux avatar Jun 18 '24 15:06 le-chartreux

The issue

I will use @RNHTTR 's example DAG to make the explanation simpler. So, we have the following tasks:

init >> tg (imsleepy >> imawake)

When the init task instance terminates successfully, both task instances, imsleepy and imawake, are in None state and they are also not expanded yet (meaning, their map_index is -1).

Then, the scheduler gets the next task instance to be run, which is imsleepy, checks that its dependencies are met, and expands it into three task instances with None state (and map_indexes 0, 1 and 2).

Next, the scheduler gets the following task instance to be run, which is imawake. At this moment, only the init task instance has finished running. So, when checking imawake task instance's dependencies inside method _evaluate_direct_relatives from TriggerRuleDep class, we have the following code to calculate how many upstream tasks it has:

https://github.com/apache/airflow/blob/1d7ede78d3440f7752087beccb67c4ebe3c4acdf/airflow/ti_deps/deps/trigger_rule_dep.py#L355-L366

When this code is running for ti being imawake's task instance, task_id_counts is set to 0 by that query, and then upstream is also set to 0, which is completely wrong. It should actually be set to 1, because it has 1 upstream task, which is imsleepy.

Since imsleepy task instance's trigger rule is NONE_FAILED_MIN_ONE_SUCCESS and upstream is set to 0, the following condition on line 403 is met (because skipped is also 0, since no upstream task was skipped), and the imawake task instance will have its state changed to SKIPPED even before being expanded (unwanted behavior).

https://github.com/apache/airflow/blob/1d7ede78d3440f7752087beccb67c4ebe3c4acdf/airflow/ti_deps/deps/trigger_rule_dep.py#L400-L404

Now, why did that query return 0? Because the call to _iter_upstream_conditions returned condition task_id == imsleepy and map_index < 0 and, at this moment, there is no TaskInstance from task imsleepy with map_index < 0 because it was expanded into three task instances with map_index 0, 1 and 2.

So the ROOT CAUSE is: the Database is updated with imsleepy expansion data (otherwise, that query above would not result in 0), but the program does not identify that! In this example, inside the _iter_upstream_conditions, TaskInstance.get_relevant_upstream_map_indexes is called and it returns -1, which makes the condition mentioned above be returned by _iter_upstream_conditions.

Possible solution

  1. Insert a specific (not sure which) condition in the TaskInstance.get_relevant_upstream_map_indexes method to cover this edge case.

I invite all of you to review and question this explanation to guarantee it is correct, and also, if anyone has suggestions for solutions, they are very welcome!

mateuslatrova avatar Jun 19 '24 23:06 mateuslatrova

I'd just like to point out that this issue seems related to #34023 which I'm trying to tackle (for now, without much success). @mateuslatrova - if you're still working on this issue, I'd be happy if you could also take a look there.

shahar1 avatar Jul 08 '24 20:07 shahar1

I am observing this same behavior with both TriggerRule.ONE_SUCCESS and TriggerRule.NONE_FAILED_MIN_ONE_SUCCESS on 2.10.3.

fredthomsen avatar Dec 20 '24 17:12 fredthomsen

I observe a similar behavior in 2.10.4. I filled in a bug report that was unfortunately moved to 'discussion' - #46315.

The solution I found was to pass the task argument to the joining downstream task (with trigger-policy 'none_failed_min_one_success') - see workaround in #46315.

olk avatar Jan 31 '25 14:01 olk

I observe a similar behavior in 2.10.4. I filled in a bug report that was unfortunately moved to 'discussion' - https://github.com/apache/airflow/discussions/46315.

There is nothing wrong in a discussion being a 'diccussion` - main difference is that it is either not clear that this is an airflow isssue and author should likely try to engage otehrs or provide more information to turn it into a clear "issue" or that there are other issues already.

BTW. Having somethign as "issue" also does not absolutely guarantee that it will be taken care of. It might, or might not be fixed if there is somene who will look at it and decide that they want to spend time on fixing it. That's how open-source development works. And if you are really into fixing a problem, the fastest way to fix it is to contribute a fix, second best is to find someone to do it. Otherwise it's there, waiting for someone to fix it. And the better diagnosed and clear it is, the more likely someone will take a look at it

Don't mix "issues" here with "service tickets" - things are done here when somene does it. That's it. Issues and discussions just make it easier for those people to show up and fix it, so the more easy it is for them, the better chance they will. But it mostly up to authors to make it easy. Especailly in non-critical issues.

potiuk avatar Jan 31 '25 14:01 potiuk

OK, unfortunately I've the impression that nobody takes care about 'discussions' - anyway the 'issue' seams already be addressed in another bug ticket (I've linked each other).

olk avatar Jan 31 '25 14:01 olk

OK, unfortunately I've the impression that nobody takes care about 'discussions' - anyway the 'issue' seams already be addressed in another bug ticket (I've linked each other).

Yes. As I wrote - it's on the author to manage, lead the discusison to something "tangible" - i.e. enough understanding that this is an issue. It's a clear indication that it needs more input and discussions to be seen as "airflow issue" and recognized as such. So it's up to the author (you in this case) to drag atention and involve people to be interested.

There is no magical "featire" of issue that makes it more popular. there are plenty of issues as well, even if they are seen as classified as issues, they do not have enough traction and they are also often closed automatically, because they have not dragged anyone's attention, so I would not worry too much about "interest", rather "did I, author, do everything to explain and show and drag attention of those who could fix it, to actually pick interest in fixing it" - it's mostly on the author anyway.

That's how open source development works.

potiuk avatar Jan 31 '25 14:01 potiuk

I'm stepping back from the assignement for now due to priorities. Anyone is welcome to take this issue. I might reclaim it in the future if no one else does :)

shahar1 avatar Feb 01 '25 12:02 shahar1