flyte icon indicating copy to clipboard operation
flyte copied to clipboard

[Core feature] Create or document a fast way to query the status of large workflows

Open Tom-Newton opened this issue 11 months ago • 10 comments

Motivation: Why do you think this is important?

For large workflows with ~1000s of nodes its difficult to monitor the overall progress. At this scale we need to be able to get things like counts of how many nodes or sub-workflows are in each status. e.g. 900 success, 200: unknown 2 failed, 98: in progress.

I think supporting large workflows like this would be a valuable feature and its critical to what I want to do with flyte.

Goal: What should the final outcome look like, ideally?

I think ideal would be an interface similar to remote.sync_execution(sync_nodes=True) but fast for large workflows. This would be very flexible.

Other interfaces would also be fine - I'm mostly just interested in it being fast.

Describe alternatives you've considered

The flyte UI: It displays lists and graphs but at the scale of 1000s of nodes these are impossible to parse by eye. Additionally in tends to crash my browser.

flytectl get execution Can get information about nodes when using --details but it seems to be incomplete. Writing to a .yaml file and searching, I find quite a lot of nodes are missing.

flytekit remote.sync_execution(sync_nodes=True) This does fetch all the important information and could certainly be parsed by some python code to extra whatever metrics are needed. The problem is that it takes about 12 minutes to run on a workflow with 3000 nodes. EDIT: It actually doesn't fetch information about nodes that haven't started processing yet. So nodes that would show with unknown status on the UI are missed.

Propose: Link/Inline OR Additional context

No response

Are you sure this issue hasn't been raised already?

  • [X] Yes

Have you read the Code of Conduct?

  • [X] Yes

Tom-Newton avatar Sep 20 '23 15:09 Tom-Newton

Thank you for opening your first issue here! 🛠

welcome[bot] avatar Sep 20 '23 15:09 welcome[bot]

cc @pingsutw when we added system tags did we add a query method for this - in list executions?

kumare3 avatar Sep 21 '23 04:09 kumare3

yup. List with filter: http://localhost:30082/api/v1/executions/flytesnacks/development?filters=value_in(admin_tag.name,flyte%3Btest)&limit=25&sort_by.direction=DESCENDING&sort_by.key=created_at

https://github.com/flyteorg/flyte/issues/3960

pingsutw avatar Sep 21 '23 04:09 pingsutw

Thanks for the replies. I gave the tags filtering a try but I don't really understand how the tags can help. I want to see the status of all the nodes in a particular workflow quickly but I can't find any API that can list all the nodes in a workflow. All the APIs seem to list only immediate child nodes, rather that the whole graph.

I don't really think filtering is the problem, all the APIs I've tested already have too much filtering.

Tom-Newton avatar Sep 21 '23 12:09 Tom-Newton

I would still be keen to know if there is a better way but I ended up creating a version of remote.sync_execution(sync_nodes=True) that make gRPC requests in many parallel threads by wrapping iterate_node_executions and running it with asyncio. Now the 12 minutes is now more like 10 seconds.

Tom-Newton avatar Sep 26 '23 10:09 Tom-Newton

This is awesome, @Tom-Newton... flytekit (particularly remote) needs asyncio everywhere... Would you be open to redoing your change within flytekit codebase to support asyncIO? even if we do it one API at a time (or create a separate asyncRemote type with smaller interface initially until we migrate all APIs)

EngHabu avatar Sep 26 '23 17:09 EngHabu

What I did was a fairly naive approach that could definitely be optimised. The best way though would be to add native support for asyncio in flytekit as you say. Anything that uses gRPC could probably benefit. I'm a total noob when it comes to asyncio and to gRPC but it looks like we could use https://grpc.github.io/grpc/python/grpc_asyncio.html. Potentially I could contribute to this but probably not in the near term.

Tom-Newton avatar Sep 26 '23 21:09 Tom-Newton

If anyone is interested in the code I'm using personally flyte_remote.py.txt (Its got a .txt extension because github doesn't allow uploading .py)

Tom-Newton avatar Sep 26 '23 22:09 Tom-Newton

I just saw https://github.com/flyteorg/flyte/issues/2193 which seems to be about an async interface to flyte admin, which is one possible solution to this

Tom-Newton avatar Feb 05 '24 19:02 Tom-Newton

🤖

Hey @Tom-Newton! Great to see you diving into the depths of Flyte workflows again. Looks like we're on another interesting adventure. 🚀

Based on your requirements, I would recommend using the ListNodeExecutions function in the NodeExecutionManager class. This function lists all node executions for a given workflow execution and can be used to efficiently query the status of large workflows in Flyte.

Here is a sample usage of the ListNodeExecutions function:

nodeExecutionListRequest := admin.NodeExecutionListRequest{
    WorkflowExecutionId: &core.WorkflowExecutionIdentifier{
        Project: "my_project",
        Domain:  "my_domain",
        Name:    "my_workflow_execution_name",
    },
    Filters: "eq(phase, SUCCEEDED)",
    Limit:   100,
    Token:   "",
    SortBy:  &admin.Sort{Key: "created_at", Direction: admin.Sort_DESCENDING},
}

nodeExecutionList, err := nodeExecutionManager.ListNodeExecutions(context.Background(), nodeExecutionListRequest)
if err != nil {
    log.Fatalf("Failed to list node executions: %v", err)
}

for _, nodeExecution := range nodeExecutionList.NodeExecutions {
    fmt.Printf("Node execution: %v\n", nodeExecution)
}

In this example, the Filters field is used to filter node executions by their phase. The Limit field is used to limit the number of node executions returned in a single request. The Token field is used for pagination. The SortBy field is used to sort the node executions by their creation time in descending order.

Please note that you need to replace "my_project", "my_domain", and "my_workflow_execution_name" with your actual project, domain, and workflow execution name. Also, replace "eq(phase, SUCCEEDED)" with your actual filter condition.

The ListNodeExecutions function handles large workflows with thousands of nodes by implementing pagination. This is done by using a limit and an offset to retrieve a subset of node executions at a time. If the number of retrieved node executions equals the limit, a new token is created for the next page. This token is the sum of the offset and the number of retrieved node executions. This way, the function can handle large workflows with thousands of nodes by retrieving and processing a manageable number of nodes at a time.

You can find more details about the ListNodeExecutions function in the flyteadmin/pkg/manager/impl/node_execution_manager.go file.

Regarding the async interface to Flyte admin discussed in issue #2193, it could potentially improve the performance of querying the status of large workflows. However, it would require significant changes to the Flyte admin and might not be available in the near future. For now, I would recommend using the ListNodeExecutions function as it is designed to handle large workflows efficiently.

I hope this helps! If you have any further questions, feel free to ask.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

dosubot[bot] avatar Feb 05 '24 19:02 dosubot[bot]