airflow icon indicating copy to clipboard operation
airflow copied to clipboard

Dynamic task mapping zip() iterates unexpected number of times

Open BasPH opened this issue 3 years ago • 7 comments
trafficstars

Apache Airflow version

2.4.0

What happened

When running zip() with different-length lists, I get an unexpected result:

from datetime import datetime

from airflow import DAG
from airflow.decorators import task

with DAG(
    dag_id="demo_dynamic_task_mapping_zip",
    start_date=datetime(2022, 1, 1),
    schedule=None,
):

    @task
    def push_letters():
        return ["a", "b", "c"]

    @task
    def push_numbers():
        return [1, 2, 3, 4]

    @task
    def pull(value):
        print(value)

    pull.expand(value=push_letters().zip(push_numbers()))

Iterates over [("a", 1), ("b", 2), ("c", 3), ("a", 1)], so it iterates for the length of the longest collection, but restarts iterating elements when reaching the length of the shortest collection.

I would expect it to behave like Python's builtin zip and iterate for the length of the shortest collection, so 3x in the example above, i.e. [("a", 1), ("b", 2), ("c", 3)].

Additionally, I went digging in the source code and found the fillvalue argument which works as expected:

pull.expand(value=push_letters().zip(push_numbers(), fillvalue="foo"))

Iterates over [("a", 1), ("b", 2), ("c", 3), ("foo", 4)].

However, with fillvalue not set, I would expect it to iterate only for the length of the shortest collection.

What you think should happen instead

I expect zip() to iterate over the number of elements of the shortest collection (without fillvalue set).

How to reproduce

See above.

Operating System

MacOS

Versions of Apache Airflow Providers

No response

Deployment

Other

Deployment details

OSS Airflow

Anything else

No response

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

BasPH avatar Sep 19 '22 18:09 BasPH

As per my debugging I guess the NOTSET object being set is different across task runs in the scheduler is different causing conditionals to return False. The exact case reported is present as test case but I guess since it's run with pytest as single process this bug is exposed only when using the scheduler.

https://github.com/apache/airflow/blob/4c33f6bcf527448283a738ef11478b75ba339422/airflow/models/xcom_arg.py#L455 https://github.com/apache/airflow/blob/4c33f6bcf527448283a738ef11478b75ba339422/tests/models/test_xcom_arg.py#L185-L225

tirkarthi avatar Sep 20 '22 15:09 tirkarthi

Hello, I am new here, but I have been wanting to contribute to this project. I do not want to create a random pull request, so I am showing my changes here. I did run the pre-commit and unit test shown above. I was not able to run the DAG above in Docker, however on macOS. My thought on this is that there may be different instances of NOTSET and, therefore, comparison is not working. It compares against the object, not a value. In order to solve this, perhaps testing against the class ArgNotSet would be more reliable. I would create a PR, but I would like to test against the failed use case and do not want to violate the contribution decorum.

class _ZipResult(Sequence):
    def __init__(self, values: Sequence[Sequence | dict], *, fillvalue: Any = NOTSET) -> None:
        self.values = values
        self.fillvalue = fillvalue
        # use the generator here, rather than in __len__ to improve efficiency
        lengths = (len(v) for v in self.values)
        self.length = min(lengths) if isinstance(self.fillvalue, ArgNotSet) else max(lengths)

    @staticmethod
    def _get_or_fill(container: Sequence | dict, index: Any, fillvalue: Any) -> Any:
        try:
            return container[index]
        except (IndexError, KeyError):
            return fillvalue

    def __getitem__(self, index: Any) -> Any:
        if index >= len(self):
            raise IndexError(index)
        return tuple(self._get_or_fill(value, index, self.fillvalue) for value in self.values)

    def __len__(self) -> int:
        return self.length

rjmcginness avatar Sep 21 '22 06:09 rjmcginness

@rjmcginness You patch would break fillvalue and needs some additional work.

@tirkarthi I would expect the resulting value to contain NOTSET instances instead of wrapping over to the beginning, if the NOTSET identity is the cause. (I may be missing something.) The issue not being observed in tests does seem to indicate there’s something related to serialisation though.

uranusjr avatar Sep 22 '22 05:09 uranusjr

@uranusjr Thank you. I was thinking that fillvalue would remain the same. It still receives an instance of NOTSET or a value. My code changes the test against the class, rather than against any instance of ArgNotSet. I did test this against the use case successfully. It seems strange that there would be multiple instances of NOTSET. Serialization/deserialization makes sense, as this may lead to creation of new instances. What was your thought on breaking fillvalue?

rjmcginness avatar Sep 22 '22 06:09 rjmcginness

Ah, I missed the self.length part you changed in __init__. That could be the solution (if the identity is indeed the issue), although I’d prefer the logic to live in __len__ instead of __init__. But if serialisation is the issue (and again, I am not yet entirely convinced that’s the case), the pre-commit checks and unit tests would not be enough to verify the solution. The easiest way to check would be for someone to actually run a DAG against this patch.

uranusjr avatar Sep 22 '22 06:09 uranusjr

I did. I asked on slack to find where the DAG file should go. I reproduced the error in v2.4.0. Then I ran it with my new code and received the expected output. I committed the changes back to my forked repository, but have not made a pull request.

rjmcginness avatar Sep 22 '22 11:09 rjmcginness

A pull request would be a good idea then. Maybe it’d be easier to figure out what exactly triggered the error if the full diff is presented.

uranusjr avatar Sep 22 '22 11:09 uranusjr

@uranusjr Ok. Thank you. I have to look how to do the rebasing, then I will request the pull. I appreciate your help.

rjmcginness avatar Sep 22 '22 12:09 rjmcginness