prefect
prefect copied to clipboard
Ability to use unmapped on wait_for
First check
- [X] I added a descriptive title to this issue.
- [X] I used the GitHub search to find a similar request and didn't find it.
- [X] I searched the Prefect documentation for this feature.
Describe the current behavior
The .map
interface of a prefect task is useful to submit a series of operations across an iterable of arguments for execution execution. The return of this map
method is a list of PrefectFutures
of the individual .submit
results.
If there was a secondary prefect task that does not have direct data deficiencies, we can use its wait_for
argument to tell the orchestration not to start until all mapped inputs in the first task have completed. However, there is no way to both use the map interface while specifying that it only has to wait for the corresponding future from task a to finish.
If task b has no relation to task a, but if task a has some side-effects that task b has to be aware of, it is necessary to wait for task a to finish appropriately. As an example - I have a pipeline that uses (almost exclusively) the .map
interface. When I am processing the data set (a nest series of tables on disk called a Measurement Set, a format HPCs hate) I can not operate with said data. But, I want to zip/tar it asap to avoid file-quota related issues. I can either modify the expected return and input types between functions to pass around this extra information, or I can keep the functions as I would otherwise use and leverage the dependency management of prefect.
To put with some code:
futures = task_a.map("Break this string".split())
for some_number, future in zip([1,2,3], futures):
task_b.submit(some_number, wait_for=future)
Although the above works, it has a rather unattractive mixture of prefect methods. And I am sure that when the inputs to the mapped function are sufficiently large there are performance penalties when interacting with the API.
Describe the proposed behavior
Have an operation similar to unmapped
that works for wait_for
.
futures = task_a.map("Break this string".split())
task_b.submit([1,2,3], wait_for=unmapped(futures))
This would also make it easier to build up more complex set of wait_for
statements without having to do any extra indexing or logic within a for loop.
futures = task_a.map("Break this string".split())
more_futures = new_task.map("Some other string".split())
task_b.submit([1,2,3], wait_for=(unmapped(futures), unmapped(more_futures)))
Example Use
No response
Additional context
I attempt to use the .map
interface as much as possible in my pipelines. It makes the code much more readable, and avoid unnecessary layers of indentation.
I find myself in these situation a fair amount
telescope_mss = glob("*.ms")
image_data = image_ms_data.map(ms=telescope_mss)
zip = zip_data.map(ms=telescope_mss, wait_for=image_data)
In the above situation I can not zip the measurement sets (these nasty radio-telescope formats that are folders in folders, many small files and many big files that HPCs absolutely hate) until the imaging is finished. Although I could pass the image_data future into the input of zip_data, it would mean I either have to modify the return of image_ms_data (which is the image, not the data) and updated what zip_ms expects, or add a dummy arg input to make dependency that way. This is a little smelly to me, as realistically the zip_data function should be completely agnostic to what it accepts for zipping.
This is an example. but is the type of use case pops up a little for me. As I try to convince the powers the be that we have a workable maintainable solution I'd love to keep my usage of prefect as consistent and as readable as possible.