flyte
flyte copied to clipboard
Add workflow concurrency control
Tracking issue
This solves #5659 and also closes out #6309. The related flytekit change is here - https://github.com/flyteorg/flytekit/pull/3267.
Why are the changes needed?
This work is to provide workflow concurrency control at a project/domain/wf_name level (across all versions). This is a user requested feature to primarily limit workflow concurrency to 1 in order to provide more stable data consistency and ETL. For example, if we create a cron schedule to execute some workflow every 15 min. but sometimes the workflow takes 20 min. to execute (which happens a lot, wf termination durations are non-deterministic due to async execution nature of Flyte) then there can be another workflow of the same version that overlaps the original one. This can cause problems if the data artifacts or inputs/outputs have some time-based feature for example. If we can guarantee that X is the max concurrency then users can always expect this to be the limit and ETL their pipeline with better guarantees of data correctness and consistency.
What changes were proposed in this pull request?
The primary mechanism is fairly straightforward - whenever we attempt to launch an execution, make a db query to check for running executions given the NamedEntityIdentifier triplet (project/domain/wf_name), and if the running executions is above the threshold for concurrency (max_concurrency) then we immediately fail to create the execution.
How was this patch tested?
Unit tests added to execution_manager_test.go, but namely this was internally tested at LinkedIn since we are porting this feature externally. More testing is in progress on local sandbox.
Labels
- added:
ConcurrencyPolicyand logic surrounding workflow concurrency management, as well as db migration to add an index onexecutionstable forexecution_phase.
Setup process
Screenshots
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [ ] All new and existing tests passed.
- [ ] All commits are signed-off.
Related PRs
#5659 and #6309, will also add in flytekit PR as reference.
Docs link
- Will add in docs
Summary by Bito
This pull request introduces a new feature for managing workflow concurrency, allowing users to limit concurrent executions at various levels. Enhancements include a ConcurrencyPolicy, logic to check for running executions, and database migrations to improve performance and data consistency during workflows.
Codecov Report
:x: Patch coverage is 63.63636% with 28 lines in your changes missing coverage. Please review.
:white_check_mark: Project coverage is 58.70%. Comparing base (c085cb5) to head (8208637).
:warning: Report is 12 commits behind head on master.
Additional details and impacted files
@@ Coverage Diff @@
## master #6475 +/- ##
==========================================
+ Coverage 58.67% 58.70% +0.02%
==========================================
Files 938 938
Lines 71466 71690 +224
==========================================
+ Hits 41933 42085 +152
- Misses 26346 26413 +67
- Partials 3187 3192 +5
| Flag | Coverage Δ | |
|---|---|---|
| unittests-datacatalog | 59.03% <ø> (ø) |
|
| unittests-flyteadmin | 56.14% <63.63%> (-0.09%) |
:arrow_down: |
| unittests-flytecopilot | 39.56% <ø> (ø) |
|
| unittests-flytectl | 64.72% <ø> (ø) |
|
| unittests-flyteidl | 76.12% <ø> (ø) |
|
| unittests-flyteplugins | 61.14% <ø> (+<0.01%) |
:arrow_up: |
| unittests-flytepropeller | 55.06% <ø> (+0.22%) |
:arrow_up: |
| unittests-flytestdlib | 64.02% <ø> (ø) |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
:rocket: New features to boost your workflow:
- :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
Can you run make generate in flyteidl dir and make lint-fix in flyteadmin dir? There are some build failures
@thomasjhuang thanks for your contribution. Also please check out the failing DCO check. Do you plan to set a specific phase when an execution is skipped? That could be a bit beyond the scope of this PR though but it'd be useful. Also please add some docs so users know how to leverage this in launchplans.
Taking a look
@troychiu can you please take a first look, you looked into this recently right?
I don't really have context but would love to take a look.
@troychiu sorry wrong one. I meant the volcano PR. I can take a look at this one
Okay I've added a few things - docs, unit test, and ran make generate. make lint-fix didn't indicate any issues on code changes that I've introduced here. Is there a reason why the checks are all pending? Probably ready for review now. Please also check out the related flytekit change - https://github.com/flyteorg/flytekit/pull/3267.
@davidmirror-ops @kumare3 @Sovietaced
@thomasjhuang thanks for this amazing contribution. Sorry about this but could you port the docs update to the new repo? https://github.com/unionai/docs I'll take point on updating this repo's docs folder to indicate it's no longer the content that ends up rendered in the docs site
@thomasjhuang thanks for this amazing contribution. Sorry about this but could you port the docs update to the new repo? https://github.com/unionai/docs I'll take point on updating this repo's
docsfolder to indicate it's no longer the content that ends up rendered in the docs site
Got it, I've opened a PR here - https://github.com/unionai/docs/pull/417
I'm looking at porting a version of this to our fork and one thing I'm looking at is how this affects Flyte scheduler. Flyte scheduler is hardcoded to try and retry the execution creation up to 30 times if it fails, unless the gRPC status code in the error is
codes.AlreadyExists.I think the scheduler code will need to be updated to either give up on
ResourceExhaustedor we'll need to a way to articulate this case through richer error details.
Makes sense - this change will increase ResourceExhausted errors noticeably, especially if measured as a metric and it can be misleading since it isn't really an error. Some changes on flytescheduler are probably needed. Although not implemented yet, I intend to also add the execution phase SKIPPED so that the UI can indicate skipped executions rather than throw resource error.
Bito Automatic Review Skipped - Files Excluded
Bito didn't auto-review this change because all changed files are in the exclusion list for automatic reviews. No action is needed if you didn't intend for the agent to review it. Otherwise, to manually trigger a review, type /review in a comment and save.
You can change the excluded files settings here, or contact your Bito workspace admin at [email protected].
We have an end to end test currently validating this functionality and it seems like there might be a correctness error in the logic that looks for previous executions.
func TestFlyte_WorkflowConcurrencyLimits(t *testing.T) {
lp := "e2e_singleton_workflow"
client, err := config.Flyte.GetAdminClient(config.TestCtx)
require.NoError(t, err, "getting flyte admin client")
latestLaunchPlan := config.Flyte.FindLatestLaunchPlan(config.TestCtx, t, lp)
t.Logf("Found most recent launch plan with version [%s]", latestLaunchPlan.GetId().GetVersion())
_, err = client.AdminClient().CreateExecution(config.TestCtx, &pbadmin.ExecutionCreateRequest{
Project: config.Flyte.Project,
Domain: config.Flyte.Domain,
Spec: &pbadmin.ExecutionSpec{
LaunchPlan: &pbcore.Identifier{
ResourceType: pbcore.ResourceType_LAUNCH_PLAN,
Project: config.Flyte.Project,
Domain: config.Flyte.Domain,
Name: lp,
Version: latestLaunchPlan.GetId().GetVersion(),
},
},
})
require.NoError(t, err, "creating execution")
// Creating a second execution should fail while the first is non-terminal
_, err = client.AdminClient().CreateExecution(config.TestCtx, &pbadmin.ExecutionCreateRequest{
Project: config.Flyte.Project,
Domain: config.Flyte.Domain,
Spec: &pbadmin.ExecutionSpec{
LaunchPlan: &pbcore.Identifier{
ResourceType: pbcore.ResourceType_LAUNCH_PLAN,
Project: config.Flyte.Project,
Domain: config.Flyte.Domain,
Name: lp,
Version: latestLaunchPlan.GetId().GetVersion(),
},
},
})
require.Error(t, err, "creating execution")
s, ok := status.FromError(err)
require.True(t, ok, "should be a grpc status error")
require.Equal(t, codes.ResourceExhausted, s.Code())
}
This is failing in our production environment where there is more load and I'm wondering if the state filtering isn't quite right.
Makes sense - this change will increase
ResourceExhaustederrors noticeably, especially if measured as a metric and it can be misleading since it isn't really an error. Some changes on flytescheduler are probably needed. Although not implemented yet, I intend to also add the execution phaseSKIPPEDso that the UI can indicate skipped executions rather than throw resource error.
For this version can we at least treat it as non-retryable? That's what we're doing and it seems to be ok.
@EngHabu seemed happy with this. We've been using a variation of it in production for the past couple weeks so I think its safe to land this.
Congrats on merging your first pull request! 🎉
I'm looking at porting a version of this to our fork and one thing I'm looking at is how this affects Flyte scheduler. Flyte scheduler is hardcoded to try and retry the execution creation up to 30 times if it fails, unless the gRPC status code in the error is
codes.AlreadyExists. I think the scheduler code will need to be updated to either give up onResourceExhaustedor we'll need to a way to articulate this case through richer error details.Makes sense - this change will increase
ResourceExhaustederrors noticeably, especially if measured as a metric and it can be misleading since it isn't really an error. Some changes on flytescheduler are probably needed. Although not implemented yet, I intend to also add the execution phaseSKIPPEDso that the UI can indicate skipped executions rather than throw resource error.
Hi @thomasjhuang @Sovietaced . Since I’m currently working on some scheduler related issues, I can go ahead and open an issue for this and submit a PR.