prefect
prefect copied to clipboard
First class Sub-flow concept
Archived from the Prefect Public Slack Community
walter_gillett: Hi - we are building bioinformatics pipelines related to infectious disease. Prefect looks interesting. I am wondering about task grouping (a.k.a. nesting or sub-dags). Each step in our pipeline reads inputs from GCS and writes outputs to GCS. Without task grouping, this will get messy. For example, suppose we have steps 1, 2, and 3, each of which reads one GCS input and writes a GCS output. That yields 9 tasks (3 GCS download, 3 compute, and 3 upload), but we would like to group them into pipeline steps because that’s the essential unit of work. Is there a way to model this in Prefect?
chris: Hi <@UQM4X5RE2>! Apologies if I’m misunderstanding the use case, but it sounds like you only need 3 Prefect Tasks? What is the benefit you hope to achieve by “grouping” tasks without them being realized as true Prefect Tasks?
walter_gillett: Hi <@ULRBLQ19A> - likely I am misunderstanding how Prefect works. Yes, I want only 3 Prefect Tasks. But if I want to use Prefect machinery to conveniently download from GCS, that's a task (prefect.tasks.google.storage.GCSDownload), same for upload, so I get 9 Tasks, yes? Conceptually there are 3 pipeline steps so I would like the workflow structure to reflect that. I am thinking of this as being like SubDAGs in Airflow (https://www.astronomer.io/guides/subdags/), where aggregating low-level details makes it possible to have a workflow with a higher level of granularity.
walter_gillett: I see related discussion here: https://docs.prefect.io/core/PINs/PIN-05-Combining-Tasks.html and https://github.com/PrefectHQ/prefect/issues/980 . But not sure what the recommendation coming out of that is.
chris: Yea, I think I understand better what you’re referring to now - thanks for that link; correct me if I’m wrong here, but the airflow notion of SubDAG is an API convenience in the UI for seeing task groupings, which makes sense. I don’t think I see any functional difference in the way the DAG behaves between the fully expanded representation and the SubDAG representation.
In Prefect, you can certainly create multiple flows and then link them together using some combination of flow.update
/ flow.set_dependencies
/ flow.root_tasks()
/ flow.terminal_tasks()
but ultimately we haven’t yet exposed an analogous first-class “sub Flow” concept
walter_gillett: Thanks <@ULRBLQ19A> good to know, rolling up flows could be the answer for now. Adding a first-class subflow concept to Prefect would be helpful, but nesting adds complexity so would have to be done carefully - more is not always better. As a side note re Airflow SubDAGs from the article I linked to "Astronomer highly recommends staying away from SubDags. Airflow 1.10 has changed the default SubDag execution method to use the Sequential Executor to work around deadlocks caused by SubDags".
chris: very interesting; yea I agree this seems like a really convenient abstraction - we’ll definitely look into it! I’ll actually use our bot to archive this thread as a GitHub issue that we can use to track it
chris: <@ULVA73B9P> archive “First class Sub-flow concept”
Reopening to look into what a sub-flow API might look like in Prefect!
FYI to the world: we are having our first design discussions next week on subflows internally!
UPDATE: No specific action items or high priority roadmpaaing was determined in this meeting.
@lauralorenz how do I join in on the discussion?
Would creating a task that runs other tasks be an example of this?
Currently prefect throws a ValueError
when trying to run a task within another task.
from prefect import task, Flow
@task
def subtask():
pass
@task
def main():
subtask()
with Flow('flow') as flow:
main()
if __name__ == '__main__':
flow.run()
This will throw ValueError: Could not infer an active Flow context.
Edit: I do not think it is possible for the functional API to work this way. Using subtask.run()
would work but prefect would not include subtask in the flow's DAG.
FYI, though we didn't end up coming up with any really concrete action items or agreed-upon design when we discussed internally, we did generate this brainstorm diagram that might interest folks who have thoughts about subflows: Subflows Kickoff.pdf
TL;DR:
- page 1: what would one actually do with subflows? we came up with a) establish flow state dependencies b) configuration management c) modularize flow code (this is what OP of this issue was referring to, by our categorization) d) execute partial flows
- page 2: what is the user experience during the lifecycle of a subflow in Core, Cloud, and the UI? (for example, can you configure a subflow to 'retry' -- implying it is more like a task? Do you retry from the top of the entire flow?)
- page 3: is the subflow unit itself composed of
Task
subclasses orFlow
subclasses (or neither?) How does that break one or the other abstraction? What do we get to reuse by picking one or the other? and then what is the API like? someFlow
class centric ideas:
from my_other_file import subflow # a Flow object
with Flow("parent") as f:
flow_results = subflow.map([1,2,3])
result = thing(flow_results)
with Flow("parent", state_handlers=some_state_handlers) as f:
thing()
with Flow("child", state_handlers=more_state_handlers) as sub_f:
thing()
@edeediong the conversation is just happening on this issue and in the contributors channel in slack for now afaik!
@petermorrow I think that could be an example, is that how you would prefer to define your subflows -- as tasks calling tasks? If so, this would probably align more with representing subflows as a Task
class internally (see page 3 of the pdf for our thoughts on that). You are correct that it doesn't work now that way, since a task will try to bind to a flow whenever it is called (code ref here and right below there)
Thanks for the response @lauralorenz and for sharing the miro board! Task design vs flow design pros/cons on page 3 is really interesting. I'd like to learn more about why some of these pros/cons are there. To address your question, the design I have in mind is a kind of mix where the flow object is callable like a task. Something like
from .flows.common import a_flow, b_flow
from .tasks.specialized import special_task
with Flow('ab-flow') as ab_flow:
a, b = Parameter('a'), Parameter('b')
a_flow_task = a_flow(a)
b_flow_task = b_flow(b)
special_task(upstream_tasks=[a_flow_task, b_flow_task])
Where ab_flow(a, b)
can also be called in another flow. I think this design would make it easier to modularize flows and keep tasks tiny. I feel like it will also be more explicit what code is run when this way vs calling a task from another task. I'm not sure what the pros/cons are with this approach or whether it's technically feasible. What do you think?
That pattern seems currently achievable w/ Python functions:
def a_flow(a: Parameter):
# call / create some tasks
# e.g.,
my_task(a)
@cicdw that's how the Flow.__call__
method would work for a_flow
. Having that functionality built into a flow would be nice.
Just want to throw in another direction for discussion, which is happening in some other forums. @gryBox has been going more down the line of thinking of exposing more set theory operations on arbitrary flow objects, with a lot of inspiration from networkx. There's a draft PR at #2699 with code for some of the ideas and some further background in a draft PIN on google docs that might be worth a look for people interested in this idea. In my view these are extensions to what @cicdw mentioned in the OP regarding flow.update
and related, but more sophisticated.
What about decorator concept like that?
import prefect
from prefect import task, Flow, Parameter
from time import sleep
## DECORATOR
def flow(*flow_args, **flow_kwargs):
"""Flow and SubFlow decorator."""
def decorator(func):
def wrapper(*args, **kwargs):
if args or kwargs:
return func(*args, **kwargs)
else:
with Flow(*flow_args, **flow_kwargs) as flow_instance:
func()
return flow_instance
return wrapper
return decorator
## EXAMPLE
@task
def get_sleep_list(start, stop, step):
return list(range(start, stop, step))
@task
def sleeping(x):
sleep(x)
return x
@task
def log(lst):
logger = prefect.context.get('logger')
result = '\n'.join([' Task sleeping(x={})'.format(i)
for i in lst])
logger.info('Task results!\n%s', result)
@flow('List generator')
def flow_gen(start=Parameter('start', default=3),
stop=Parameter('stop', default=12),
step=Parameter('step', default=3)):
sleep_list = get_sleep_list(start, stop, step)
return sleep_list
@flow('Sleeping')
def flow_sleep(lst=Parameter('lst', default=[2, 4, 6])):
result = sleeping.map(lst)
return result
def flow_main():
with Flow('List sleeping') as flow_inst:
start = Parameter('start', default=2)
stop = Parameter('stop', default=8)
step = Parameter('step', default=2)
sleep_list = flow_gen(start, stop, step)
result = flow_sleep(sleep_list)
log(result)
return flow_inst
if __name__ == '__main__':
flow_gen().register(project_name='tests')
flow_sleep().register(project_name='tests')
flow_main().register(project_name='tests')
I built-up a bit on top of @Abrosimov-a-a 's decorator concept and came up with this.
import prefect
from prefect import Flow
## DECORATOR
def flow(*flow_args, **flow_kwargs):
"""Flow and SubFlow decorator."""
def decorator(func):
def wrapper(*args, **kwargs):
flow = prefect.context.get("flow", None)
if not flow:
with Flow(*flow_args, **flow_kwargs) as flow_instance:
func(*args, **kwargs)
return flow_instance
else:
return func(*args, **kwargs)
return wrapper
return decorator
If the flow
is called within another flow, it becomes a sub-flow. Otherwise, it becomes a Flow
on its own.
Prefect 2.0 will have a first class concept of subflows that is ready for anyone willing to test pre-release software:
- some intro docs: https://orion-docs.prefect.io/concepts/flows/#subflows
- to test the latest,
pip install -U "prefect>2.0a1"
and please note that this should be done in a separate environment from your standardprefect
installation
Completed in 2.0