airflow
airflow copied to clipboard
Dynamic task mapping zip() iterates unexpected number of times
Apache Airflow version
2.4.0
What happened
When running zip() with different-length lists, I get an unexpected result:
from datetime import datetime
from airflow import DAG
from airflow.decorators import task
with DAG(
dag_id="demo_dynamic_task_mapping_zip",
start_date=datetime(2022, 1, 1),
schedule=None,
):
@task
def push_letters():
return ["a", "b", "c"]
@task
def push_numbers():
return [1, 2, 3, 4]
@task
def pull(value):
print(value)
pull.expand(value=push_letters().zip(push_numbers()))
Iterates over [("a", 1), ("b", 2), ("c", 3), ("a", 1)], so it iterates for the length of the longest collection, but restarts iterating elements when reaching the length of the shortest collection.
I would expect it to behave like Python's builtin zip and iterate for the length of the shortest collection, so 3x in the example above, i.e. [("a", 1), ("b", 2), ("c", 3)].
Additionally, I went digging in the source code and found the fillvalue argument which works as expected:
pull.expand(value=push_letters().zip(push_numbers(), fillvalue="foo"))
Iterates over [("a", 1), ("b", 2), ("c", 3), ("foo", 4)].
However, with fillvalue not set, I would expect it to iterate only for the length of the shortest collection.
What you think should happen instead
I expect zip() to iterate over the number of elements of the shortest collection (without fillvalue set).
How to reproduce
See above.
Operating System
MacOS
Versions of Apache Airflow Providers
No response
Deployment
Other
Deployment details
OSS Airflow
Anything else
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
As per my debugging I guess the NOTSET object being set is different across task runs in the scheduler is different causing conditionals to return False. The exact case reported is present as test case but I guess since it's run with pytest as single process this bug is exposed only when using the scheduler.
https://github.com/apache/airflow/blob/4c33f6bcf527448283a738ef11478b75ba339422/airflow/models/xcom_arg.py#L455 https://github.com/apache/airflow/blob/4c33f6bcf527448283a738ef11478b75ba339422/tests/models/test_xcom_arg.py#L185-L225
Hello, I am new here, but I have been wanting to contribute to this project. I do not want to create a random pull request, so I am showing my changes here. I did run the pre-commit and unit test shown above. I was not able to run the DAG above in Docker, however on macOS. My thought on this is that there may be different instances of NOTSET and, therefore, comparison is not working. It compares against the object, not a value. In order to solve this, perhaps testing against the class ArgNotSet would be more reliable. I would create a PR, but I would like to test against the failed use case and do not want to violate the contribution decorum.
class _ZipResult(Sequence):
def __init__(self, values: Sequence[Sequence | dict], *, fillvalue: Any = NOTSET) -> None:
self.values = values
self.fillvalue = fillvalue
# use the generator here, rather than in __len__ to improve efficiency
lengths = (len(v) for v in self.values)
self.length = min(lengths) if isinstance(self.fillvalue, ArgNotSet) else max(lengths)
@staticmethod
def _get_or_fill(container: Sequence | dict, index: Any, fillvalue: Any) -> Any:
try:
return container[index]
except (IndexError, KeyError):
return fillvalue
def __getitem__(self, index: Any) -> Any:
if index >= len(self):
raise IndexError(index)
return tuple(self._get_or_fill(value, index, self.fillvalue) for value in self.values)
def __len__(self) -> int:
return self.length
@rjmcginness You patch would break fillvalue and needs some additional work.
@tirkarthi I would expect the resulting value to contain NOTSET instances instead of wrapping over to the beginning, if the NOTSET identity is the cause. (I may be missing something.) The issue not being observed in tests does seem to indicate there’s something related to serialisation though.
@uranusjr Thank you. I was thinking that fillvalue would remain the same. It still receives an instance of NOTSET or a value. My code changes the test against the class, rather than against any instance of ArgNotSet. I did test this against the use case successfully. It seems strange that there would be multiple instances of NOTSET. Serialization/deserialization makes sense, as this may lead to creation of new instances. What was your thought on breaking fillvalue?
Ah, I missed the self.length part you changed in __init__. That could be the solution (if the identity is indeed the issue), although I’d prefer the logic to live in __len__ instead of __init__. But if serialisation is the issue (and again, I am not yet entirely convinced that’s the case), the pre-commit checks and unit tests would not be enough to verify the solution. The easiest way to check would be for someone to actually run a DAG against this patch.
I did. I asked on slack to find where the DAG file should go. I reproduced the error in v2.4.0. Then I ran it with my new code and received the expected output. I committed the changes back to my forked repository, but have not made a pull request.
A pull request would be a good idea then. Maybe it’d be easier to figure out what exactly triggered the error if the full diff is presented.
@uranusjr Ok. Thank you. I have to look how to do the rebasing, then I will request the pull. I appreciate your help.