[BUG] map_task downloads all pickled outputs from previous map_task
Describe the bug
Consequent map_tasks download all outputs of entire previous map task, instead of specific outputs of a previous sub-task. This leads to a the subsequent map task running for an hour instead of 5 minutes.
Expected behavior
if map_task B's inputs are map_task A's outputs, then: the i'th sub_task of map task B, should download the outputs of the i'th sub_task of map task A
Additional context to reproduce
When running map_task A, where each sub_task returns a type Dict[str,Dict[str,np.ndarray]] then map_task B receives the output of A, where each sub task of B downloads a list of Dict[str,Dict[str,np.ndarray]] instead of a single object.
Screenshots
map_task of task "save_as_parquet_to_s3" takes more than an hour, instead of 5 minutes
Are you sure this issue hasn't been raised already?
- [X] Yes
Have you read the Code of Conduct?
- [X] Yes
Thank you for opening your first issue here! 🛠
Hey there, @tomtou-bspace! Great to see you back diving into the intricacies of workflow orchestration. Hope you're ready for another round of troubleshooting and insights. Let's get to it!
Based on the information you've provided, it seems like the issue you're encountering is due to the way outputs are being passed between map_task A and map_task B. Here are some suggestions on how you might resolve this issue:
-
Ensure Task Isolation: Make sure that each sub-task in
map_task Bis designed to only access its corresponding output frommap_task A. This might involve adjusting how outputs are passed between tasks to ensure they are not bundled together in a way that requires downloading all outputs when only one is needed. -
Use ArrayNode for Enhanced Mapping Support: The Flyte documentation mentions
ArrayNodeas an enhancement over regular map tasks, offering wider mapping support and improved subtask execution handling, including cache management and subtask failure handling. By usingArrayNodefrom the experimental module, you might gain better control over how outputs are managed and accessed, potentially resolving the issue of unnecessary data downloads.Here is a sample code snippet:
from flytekit import task, workflow from flytekit.experimental import map_task @task def your_task(a: int) -> int: ... @workflow def your_workflow(xs: list[int]) -> list[int]: return map_task(your_task)(a=xs) -
Review Task Output and Input Handling: Re-examine how outputs from
map_task Aare structured and howmap_task Baccesses these outputs. It might be necessary to refactor the tasks to ensure that each sub-task inmap_task Bexplicitly accesses only its corresponding output frommap_task A, possibly by indexing or other means of direct reference. -
Optimize Data Storage and Access: If the workflow involves storing outputs of
map_task Ain a shared location (e.g., cloud storage), consider structuring the storage so that each output is stored in a separate, directly accessible file or object. This way, sub-tasks inmap_task Bcan directly access only the data they need without downloading everything. -
Leverage Task Metadata and Resource Overrides: Although not directly related to the issue of data downloading, optimizing the execution of map tasks through the use of
TaskMetadatafor caching andwith_overridesfor resource allocation (as shown in the provided examples) can contribute to overall workflow efficiency. This might indirectly alleviate the impact of increased runtime due to unnecessary data downloads.
For more information, you can refer to the Flyte documentation on map tasks.
Regarding the structure and type of the output from map_task A and how it is passed to map_task B, the output from a task execution is encapsulated in the TaskExecutionClosure message. The TaskExecutionClosure contains a field named OutputResult, which can be one of three types: OutputUri (string), OutputData (*core.LiteralMap), or Error (*core.ExecutionError). The specific type that is used depends on the outcome of the task execution. If the task execution was successful and produced output data, the OutputData field would be used. If the task execution resulted in an error, the Error field would be used. The OutputUri field is deprecated and not used in new implementations. You can find more details in the FlyteIDL protobuf definitions.
I hope these suggestions help you resolve the issue. If you need further assistance, please provide more details about your implementation, and I'll be glad to help.
Sources
- docs/user_guide/advanced_composition/map_tasks.md
- flyteidl/gen/pb-go/flyteidl/admin/task_execution.pb.go
About Dosu
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
@tomtou-bspace , this is a known limitation of the current implementation of map tasks. Unfortunately, we're not going to revisit this decision in the short / medium term as the team is focused on other projects right now.