prefect icon indicating copy to clipboard operation
prefect copied to clipboard

Support easily running part of a flow

Open marvin-robot opened this issue 4 years ago • 8 comments

Archived from the Prefect Public Slack Community

ben.fogelson: Is there an easy way to run part of a flow? I.e. run up to and including task x?

laura: Hi! No, I don’t think so without changing your flow code (for example simply commenting out whatever after x you had added to your flow graph). You can run individual tasks (re: https://docs.prefect.io/core/advanced_tutorials/task-guide.html#calling-your-task-s-run-method) but if your flow is complex that isn’t realistic to simulate a partial flow. It is an interesting idea though and I did a quick search and we don’t have an issue already open on it, so I can publish this thread into a new issue to request it!

ben.fogelson: Thanks <@UTHEQ4F9R>, that would be great!

laura: <@ULVA73B9P> archive “Support easily running part of a flow”

marvin-robot avatar Apr 03 '20 22:04 marvin-robot

Probably worth some discussion of how this differs (or integrates) with existing ideas of subflows and targets. It seems to me this is a request not for a production purposes, but for iterative testing purposes (correct me if I'm wrong or if anyone feels differently). For that reason it seems that it shouldn't require a change to the flow code to run the subgraph simulation of a flow (which may differ significantly from a subflow implementation). It also seems to differ from targets in the sense that the tasks to skip may be at the bottom of the flow as opposed to at the top of the flow.

Another thought is that depending how complex the graph is, it may be difficult to determine what the optimal subgraph is (the main sticking point may be whether subgraph resolution is greedy about parallelism or lazy about it) to determine when to stop.

lauralorenz avatar Apr 03 '20 22:04 lauralorenz

We once had start_tasks as an option which ran a "pruned" subgraph of the flow (removed in #668). While the implementation itself was straightforward (we included all nodes downstream of the root nodes) the complexities it introduced were significant - users often were unaware that states upstream of the root tasks had to be properly passed in order for triggers/data dependencies to work as expected, and it resulted in some confusion. I think it's definitely doable but the UX needs some work.

jlowin avatar Apr 04 '20 18:04 jlowin

fyi I've asked @cicdw about this before; the analog in my mind is dask.get for custom graphs which figures out what needs to be evaluated to get to the key that you requested. the fact that dask supports this is a pretty compelling reason for prefect to have an analogous operation IMO 🙂

bnaul avatar Apr 06 '20 01:04 bnaul

I think skipping branches will mesh nicely with partial runs aka dask.get. In either case the FlowRunner* would test whether pure tasks or unwanted tasks should be skipped.

The main production use case is skipping expensive computations: in case of Cached on behalf of the caching facilities and in case of partial return_tasks on behalf of the caller. By this view, cached tasks should also skip upstream tasks @lauralorenz

That would need a pure flag, for Tasks as in "has no side effects beyond the result". An alternative may be to tag Tasks with something like a "pure" keyword.

As for UX, would opt-in with pure tasks avoid confusions of start_task? @jlowin * In #2394 the TaskRunner checks if its output already exists. To prune such state would need to be determined out of toposort order. Maybe at this spot and maybe reverse toposort can do the trick?

I've written a not perfect gist that demonstrates pruning: https://gist.github.com/ahirner/4e8a6105dfbef2e8fff04c6177a868a8 Here, output caching takes precedence over side-effects. The last test is visualized like so: Screenshot 2020-04-25 at 00 32 39 (btw, would be great to be able to annotate flow.visualize).

Last questions:

  • I'm not sure how a Cached state is ascertained before the run and if it is at all. If target checks exists, reads (with executor?) and puts the result into the context, there could be a way.
  • I think TaskRunner would also need to return its cached state before .check_upstream_finished. I'm not fully grokking the transition logic though.

ahirner avatar Apr 24 '20 22:04 ahirner

Hey guys I have also been looking into partial execution based on leave node. Are there any updates on this one? Thanks!

dusvyat avatar Nov 24 '20 12:11 dusvyat

New to prefect but would also like to add that starting from downstream nodes would be beneficial for ML flows where etl steps may take days and would only be run a few times during the experimentation/training process. In production models getting updated data on a regular basis may not want to start the entire flow over for new batch of data. Maybe it's unreasonable to have everything in one flow but there is added difficulty with multiple flows and even more so with multiple people writing code.

fjord-prefect avatar Jun 25 '21 16:06 fjord-prefect

Hey @fjord-prefect -- I think this will mostly be resolved by planned improvements to our caching interface. Basically, the upstream tasks that you don't want to re-run will be easily set to run once per time period. You can actually do this now using cache_validators -- https://docs.prefect.io/core/concepts/persistence.html#output-caching but it's not as easy as we'd like.

zanieb avatar Jun 25 '21 19:06 zanieb

Any updates on being able to do a partial flow run to get the result of one leaf node? 🙏

jesford avatar Oct 19 '21 17:10 jesford

@cicdw is this stale (per the label) or completed (per the "closed as")? Would love to know why it was closed if not completed, or some pointers to docs if it is completed

ncknuna avatar Sep 01 '22 07:09 ncknuna