prefect icon indicating copy to clipboard operation
prefect copied to clipboard

First class Sub-flow concept

Open marvin-robot opened this issue 5 years ago • 12 comments

Archived from the Prefect Public Slack Community

walter_gillett: Hi - we are building bioinformatics pipelines related to infectious disease. Prefect looks interesting. I am wondering about task grouping (a.k.a. nesting or sub-dags). Each step in our pipeline reads inputs from GCS and writes outputs to GCS. Without task grouping, this will get messy. For example, suppose we have steps 1, 2, and 3, each of which reads one GCS input and writes a GCS output. That yields 9 tasks (3 GCS download, 3 compute, and 3 upload), but we would like to group them into pipeline steps because that’s the essential unit of work. Is there a way to model this in Prefect?

chris: Hi <@UQM4X5RE2>! Apologies if I’m misunderstanding the use case, but it sounds like you only need 3 Prefect Tasks? What is the benefit you hope to achieve by “grouping” tasks without them being realized as true Prefect Tasks?

walter_gillett: Hi <@ULRBLQ19A> - likely I am misunderstanding how Prefect works. Yes, I want only 3 Prefect Tasks. But if I want to use Prefect machinery to conveniently download from GCS, that's a task (prefect.tasks.google.storage.GCSDownload), same for upload, so I get 9 Tasks, yes? Conceptually there are 3 pipeline steps so I would like the workflow structure to reflect that. I am thinking of this as being like SubDAGs in Airflow (https://www.astronomer.io/guides/subdags/), where aggregating low-level details makes it possible to have a workflow with a higher level of granularity.

walter_gillett: I see related discussion here: https://docs.prefect.io/core/PINs/PIN-05-Combining-Tasks.html and https://github.com/PrefectHQ/prefect/issues/980 . But not sure what the recommendation coming out of that is.

chris: Yea, I think I understand better what you’re referring to now - thanks for that link; correct me if I’m wrong here, but the airflow notion of SubDAG is an API convenience in the UI for seeing task groupings, which makes sense. I don’t think I see any functional difference in the way the DAG behaves between the fully expanded representation and the SubDAG representation.

In Prefect, you can certainly create multiple flows and then link them together using some combination of flow.update / flow.set_dependencies / flow.root_tasks() / flow.terminal_tasks() but ultimately we haven’t yet exposed an analogous first-class “sub Flow” concept

walter_gillett: Thanks <@ULRBLQ19A> good to know, rolling up flows could be the answer for now. Adding a first-class subflow concept to Prefect would be helpful, but nesting adds complexity so would have to be done carefully - more is not always better. As a side note re Airflow SubDAGs from the article I linked to "Astronomer highly recommends staying away from SubDags. Airflow 1.10 has changed the default SubDag execution method to use the Sequential Executor to work around deadlocks caused by SubDags".

chris: very interesting; yea I agree this seems like a really convenient abstraction - we’ll definitely look into it! I’ll actually use our bot to archive this thread as a GitHub issue that we can use to track it

chris: <@ULVA73B9P> archive “First class Sub-flow concept”

marvin-robot avatar Nov 15 '19 21:11 marvin-robot

Reopening to look into what a sub-flow API might look like in Prefect!

cicdw avatar Nov 15 '19 21:11 cicdw

FYI to the world: we are having our first design discussions next week on subflows internally!

UPDATE: No specific action items or high priority roadmpaaing was determined in this meeting.

lauralorenz avatar Apr 09 '20 18:04 lauralorenz

@lauralorenz how do I join in on the discussion?

edeediong avatar Apr 16 '20 10:04 edeediong

Would creating a task that runs other tasks be an example of this?

Currently prefect throws a ValueError when trying to run a task within another task.

from prefect import task, Flow

@task
def subtask():
    pass

@task
def main():
    subtask()

with Flow('flow') as flow:
    main()

if __name__ == '__main__':
    flow.run()

This will throw ValueError: Could not infer an active Flow context.

Edit: I do not think it is possible for the functional API to work this way. Using subtask.run() would work but prefect would not include subtask in the flow's DAG.

petermorrowdev avatar May 27 '20 00:05 petermorrowdev

FYI, though we didn't end up coming up with any really concrete action items or agreed-upon design when we discussed internally, we did generate this brainstorm diagram that might interest folks who have thoughts about subflows: Subflows Kickoff.pdf

TL;DR:

  • page 1: what would one actually do with subflows? we came up with a) establish flow state dependencies b) configuration management c) modularize flow code (this is what OP of this issue was referring to, by our categorization) d) execute partial flows
  • page 2: what is the user experience during the lifecycle of a subflow in Core, Cloud, and the UI? (for example, can you configure a subflow to 'retry' -- implying it is more like a task? Do you retry from the top of the entire flow?)
  • page 3: is the subflow unit itself composed of Task subclasses or Flow subclasses (or neither?) How does that break one or the other abstraction? What do we get to reuse by picking one or the other? and then what is the API like? some Flow class centric ideas:
from my_other_file import subflow  # a Flow object
with Flow("parent") as f:
    flow_results = subflow.map([1,2,3])
    result = thing(flow_results)
with Flow("parent", state_handlers=some_state_handlers) as f:
    thing()
    with Flow("child", state_handlers=more_state_handlers) as sub_f:
        thing()

@edeediong the conversation is just happening on this issue and in the contributors channel in slack for now afaik!

@petermorrow I think that could be an example, is that how you would prefer to define your subflows -- as tasks calling tasks? If so, this would probably align more with representing subflows as a Task class internally (see page 3 of the pdf for our thoughts on that). You are correct that it doesn't work now that way, since a task will try to bind to a flow whenever it is called (code ref here and right below there)

lauralorenz avatar Jun 01 '20 22:06 lauralorenz

Thanks for the response @lauralorenz and for sharing the miro board! Task design vs flow design pros/cons on page 3 is really interesting. I'd like to learn more about why some of these pros/cons are there. To address your question, the design I have in mind is a kind of mix where the flow object is callable like a task. Something like

from .flows.common import a_flow, b_flow
from .tasks.specialized import special_task

with Flow('ab-flow') as ab_flow:
    a, b = Parameter('a'), Parameter('b')
    a_flow_task = a_flow(a)
    b_flow_task = b_flow(b)
    special_task(upstream_tasks=[a_flow_task, b_flow_task])

Where ab_flow(a, b) can also be called in another flow. I think this design would make it easier to modularize flows and keep tasks tiny. I feel like it will also be more explicit what code is run when this way vs calling a task from another task. I'm not sure what the pros/cons are with this approach or whether it's technically feasible. What do you think?

petermorrowdev avatar Jun 02 '20 07:06 petermorrowdev

That pattern seems currently achievable w/ Python functions:

def a_flow(a: Parameter):
    # call / create some tasks
    # e.g.,
    my_task(a)

cicdw avatar Jun 02 '20 14:06 cicdw

@cicdw that's how the Flow.__call__ method would work for a_flow. Having that functionality built into a flow would be nice.

petermorrowdev avatar Jun 02 '20 17:06 petermorrowdev

Just want to throw in another direction for discussion, which is happening in some other forums. @gryBox has been going more down the line of thinking of exposing more set theory operations on arbitrary flow objects, with a lot of inspiration from networkx. There's a draft PR at #2699 with code for some of the ideas and some further background in a draft PIN on google docs that might be worth a look for people interested in this idea. In my view these are extensions to what @cicdw mentioned in the OP regarding flow.update and related, but more sophisticated.

lauralorenz avatar Jun 02 '20 17:06 lauralorenz

What about decorator concept like that?

import prefect
from prefect import task, Flow, Parameter
from time import sleep

## DECORATOR
def flow(*flow_args, **flow_kwargs):
    """Flow and SubFlow decorator."""
    def decorator(func):
        def wrapper(*args, **kwargs):
            if args or kwargs:
                return func(*args, **kwargs)
            else:
                with Flow(*flow_args, **flow_kwargs) as flow_instance:
                    func()
                return flow_instance
        return wrapper
    return decorator


## EXAMPLE
@task
def get_sleep_list(start, stop, step):
    return list(range(start, stop, step))


@task
def sleeping(x):
    sleep(x)
    return x


@task
def log(lst):
    logger = prefect.context.get('logger')
    result = '\n'.join(['  Task sleeping(x={})'.format(i)
                        for i in lst])
    logger.info('Task results!\n%s', result)


@flow('List generator')
def flow_gen(start=Parameter('start', default=3),
             stop=Parameter('stop', default=12),
             step=Parameter('step', default=3)):
    sleep_list = get_sleep_list(start, stop, step)
    return sleep_list


@flow('Sleeping')
def flow_sleep(lst=Parameter('lst', default=[2, 4, 6])):
    result = sleeping.map(lst)
    return result


def flow_main():
    with Flow('List sleeping') as flow_inst:
        start = Parameter('start', default=2)
        stop = Parameter('stop', default=8)
        step = Parameter('step', default=2)

        sleep_list = flow_gen(start, stop, step)
        result = flow_sleep(sleep_list)

        log(result)
    return flow_inst


if __name__ == '__main__':
    flow_gen().register(project_name='tests')
    flow_sleep().register(project_name='tests')
    flow_main().register(project_name='tests')

Abrosimov-a-a avatar Sep 29 '20 12:09 Abrosimov-a-a

I built-up a bit on top of @Abrosimov-a-a 's decorator concept and came up with this.

import prefect
from prefect import Flow

## DECORATOR
def flow(*flow_args, **flow_kwargs):
    """Flow and SubFlow decorator."""
    def decorator(func):
        def wrapper(*args, **kwargs):
            flow = prefect.context.get("flow", None)
            if not flow:
                with Flow(*flow_args, **flow_kwargs) as flow_instance:
                    func(*args, **kwargs)
                return flow_instance
            else:
                return func(*args, **kwargs)
        return wrapper
    return decorator

If the flow is called within another flow, it becomes a sub-flow. Otherwise, it becomes a Flow on its own.

RRRajput avatar Jan 21 '22 16:01 RRRajput

Prefect 2.0 will have a first class concept of subflows that is ready for anyone willing to test pre-release software:

  • some intro docs: https://orion-docs.prefect.io/concepts/flows/#subflows
  • to test the latest, pip install -U "prefect>2.0a1" and please note that this should be done in a separate environment from your standard prefect installation

cicdw avatar Jan 23 '22 01:01 cicdw

Completed in 2.0

cicdw avatar Aug 30 '22 23:08 cicdw