flyte icon indicating copy to clipboard operation
flyte copied to clipboard

Add workflow concurrency control

Open thomasjhuang opened this issue 6 months ago • 6 comments

Tracking issue

This solves #5659 and also closes out #6309. The related flytekit change is here - https://github.com/flyteorg/flytekit/pull/3267.

Why are the changes needed?

This work is to provide workflow concurrency control at a project/domain/wf_name level (across all versions). This is a user requested feature to primarily limit workflow concurrency to 1 in order to provide more stable data consistency and ETL. For example, if we create a cron schedule to execute some workflow every 15 min. but sometimes the workflow takes 20 min. to execute (which happens a lot, wf termination durations are non-deterministic due to async execution nature of Flyte) then there can be another workflow of the same version that overlaps the original one. This can cause problems if the data artifacts or inputs/outputs have some time-based feature for example. If we can guarantee that X is the max concurrency then users can always expect this to be the limit and ETL their pipeline with better guarantees of data correctness and consistency.

What changes were proposed in this pull request?

The primary mechanism is fairly straightforward - whenever we attempt to launch an execution, make a db query to check for running executions given the NamedEntityIdentifier triplet (project/domain/wf_name), and if the running executions is above the threshold for concurrency (max_concurrency) then we immediately fail to create the execution.

How was this patch tested?

Unit tests added to execution_manager_test.go, but namely this was internally tested at LinkedIn since we are porting this feature externally. More testing is in progress on local sandbox.

Labels

  • added: ConcurrencyPolicy and logic surrounding workflow concurrency management, as well as db migration to add an index on executions table for execution_phase.

Setup process

Screenshots

Check all the applicable boxes

  • [ ] I updated the documentation accordingly.
  • [ ] All new and existing tests passed.
  • [ ] All commits are signed-off.

Related PRs

#5659 and #6309, will also add in flytekit PR as reference.

Docs link

  • Will add in docs

Summary by Bito

This pull request introduces a new feature for managing workflow concurrency, allowing users to limit concurrent executions at various levels. Enhancements include a ConcurrencyPolicy, logic to check for running executions, and database migrations to improve performance and data consistency during workflows.

thomasjhuang avatar May 29 '25 20:05 thomasjhuang

Codecov Report

:x: Patch coverage is 63.63636% with 28 lines in your changes missing coverage. Please review. :white_check_mark: Project coverage is 58.70%. Comparing base (c085cb5) to head (8208637). :warning: Report is 12 commits behind head on master.

Files with missing lines Patch % Lines
flyteadmin/pkg/manager/impl/execution_manager.go 71.01% 15 Missing and 5 partials :warning:
flyteadmin/pkg/repositories/config/migrations.go 0.00% 8 Missing :warning:
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #6475      +/-   ##
==========================================
+ Coverage   58.67%   58.70%   +0.02%     
==========================================
  Files         938      938              
  Lines       71466    71690     +224     
==========================================
+ Hits        41933    42085     +152     
- Misses      26346    26413      +67     
- Partials     3187     3192       +5     
Flag Coverage Δ
unittests-datacatalog 59.03% <ø> (ø)
unittests-flyteadmin 56.14% <63.63%> (-0.09%) :arrow_down:
unittests-flytecopilot 39.56% <ø> (ø)
unittests-flytectl 64.72% <ø> (ø)
unittests-flyteidl 76.12% <ø> (ø)
unittests-flyteplugins 61.14% <ø> (+<0.01%) :arrow_up:
unittests-flytepropeller 55.06% <ø> (+0.22%) :arrow_up:
unittests-flytestdlib 64.02% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

:rocket: New features to boost your workflow:
  • :snowflake: Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

codecov[bot] avatar May 29 '25 21:05 codecov[bot]

Can you run make generate in flyteidl dir and make lint-fix in flyteadmin dir? There are some build failures

Sovietaced avatar May 30 '25 03:05 Sovietaced

@thomasjhuang thanks for your contribution. Also please check out the failing DCO check. Do you plan to set a specific phase when an execution is skipped? That could be a bit beyond the scope of this PR though but it'd be useful. Also please add some docs so users know how to leverage this in launchplans.

davidmirror-ops avatar Jun 03 '25 22:06 davidmirror-ops

Taking a look

kumare3 avatar Jun 13 '25 20:06 kumare3

@troychiu can you please take a first look, you looked into this recently right?

kumare3 avatar Jun 13 '25 20:06 kumare3

I don't really have context but would love to take a look.

troychiu avatar Jun 13 '25 23:06 troychiu

@troychiu sorry wrong one. I meant the volcano PR. I can take a look at this one

kumare3 avatar Jun 17 '25 22:06 kumare3

Okay I've added a few things - docs, unit test, and ran make generate. make lint-fix didn't indicate any issues on code changes that I've introduced here. Is there a reason why the checks are all pending? Probably ready for review now. Please also check out the related flytekit change - https://github.com/flyteorg/flytekit/pull/3267.

@davidmirror-ops @kumare3 @Sovietaced

thomasjhuang avatar Jul 08 '25 05:07 thomasjhuang

@thomasjhuang thanks for this amazing contribution. Sorry about this but could you port the docs update to the new repo? https://github.com/unionai/docs I'll take point on updating this repo's docs folder to indicate it's no longer the content that ends up rendered in the docs site

davidmirror-ops avatar Jul 09 '25 16:07 davidmirror-ops

@thomasjhuang thanks for this amazing contribution. Sorry about this but could you port the docs update to the new repo? https://github.com/unionai/docs I'll take point on updating this repo's docs folder to indicate it's no longer the content that ends up rendered in the docs site

Got it, I've opened a PR here - https://github.com/unionai/docs/pull/417

thomasjhuang avatar Jul 22 '25 18:07 thomasjhuang

I'm looking at porting a version of this to our fork and one thing I'm looking at is how this affects Flyte scheduler. Flyte scheduler is hardcoded to try and retry the execution creation up to 30 times if it fails, unless the gRPC status code in the error is codes.AlreadyExists.

I think the scheduler code will need to be updated to either give up on ResourceExhausted or we'll need to a way to articulate this case through richer error details.

Makes sense - this change will increase ResourceExhausted errors noticeably, especially if measured as a metric and it can be misleading since it isn't really an error. Some changes on flytescheduler are probably needed. Although not implemented yet, I intend to also add the execution phase SKIPPED so that the UI can indicate skipped executions rather than throw resource error.

thomasjhuang avatar Jul 22 '25 20:07 thomasjhuang

Bito Automatic Review Skipped - Files Excluded

Bito didn't auto-review this change because all changed files are in the exclusion list for automatic reviews. No action is needed if you didn't intend for the agent to review it. Otherwise, to manually trigger a review, type /review in a comment and save.
You can change the excluded files settings here, or contact your Bito workspace admin at [email protected].

flyte-bot avatar Jul 22 '25 21:07 flyte-bot

We have an end to end test currently validating this functionality and it seems like there might be a correctness error in the logic that looks for previous executions.

func TestFlyte_WorkflowConcurrencyLimits(t *testing.T) {

	lp := "e2e_singleton_workflow"
	client, err := config.Flyte.GetAdminClient(config.TestCtx)
	require.NoError(t, err, "getting flyte admin client")

	latestLaunchPlan := config.Flyte.FindLatestLaunchPlan(config.TestCtx, t, lp)

	t.Logf("Found most recent launch plan with version [%s]", latestLaunchPlan.GetId().GetVersion())

	_, err = client.AdminClient().CreateExecution(config.TestCtx, &pbadmin.ExecutionCreateRequest{
		Project: config.Flyte.Project,
		Domain:  config.Flyte.Domain,
		Spec: &pbadmin.ExecutionSpec{
			LaunchPlan: &pbcore.Identifier{
				ResourceType: pbcore.ResourceType_LAUNCH_PLAN,
				Project:      config.Flyte.Project,
				Domain:       config.Flyte.Domain,
				Name:         lp,
				Version:      latestLaunchPlan.GetId().GetVersion(),
			},
		},
	})
	require.NoError(t, err, "creating execution")

	// Creating a second execution should fail while the first is non-terminal
	_, err = client.AdminClient().CreateExecution(config.TestCtx, &pbadmin.ExecutionCreateRequest{
		Project: config.Flyte.Project,
		Domain:  config.Flyte.Domain,
		Spec: &pbadmin.ExecutionSpec{
			LaunchPlan: &pbcore.Identifier{
				ResourceType: pbcore.ResourceType_LAUNCH_PLAN,
				Project:      config.Flyte.Project,
				Domain:       config.Flyte.Domain,
				Name:         lp,
				Version:      latestLaunchPlan.GetId().GetVersion(),
			},
		},
	})
	require.Error(t, err, "creating execution")
	s, ok := status.FromError(err)
	require.True(t, ok, "should be a grpc status error")
	require.Equal(t, codes.ResourceExhausted, s.Code())
}

This is failing in our production environment where there is more load and I'm wondering if the state filtering isn't quite right.

Sovietaced avatar Jul 29 '25 16:07 Sovietaced

Makes sense - this change will increase ResourceExhausted errors noticeably, especially if measured as a metric and it can be misleading since it isn't really an error. Some changes on flytescheduler are probably needed. Although not implemented yet, I intend to also add the execution phase SKIPPED so that the UI can indicate skipped executions rather than throw resource error.

For this version can we at least treat it as non-retryable? That's what we're doing and it seems to be ok.

Sovietaced avatar Aug 01 '25 16:08 Sovietaced

@EngHabu seemed happy with this. We've been using a variation of it in production for the past couple weeks so I think its safe to land this.

Sovietaced avatar Aug 01 '25 21:08 Sovietaced

Congrats on merging your first pull request! 🎉

welcome[bot] avatar Aug 01 '25 21:08 welcome[bot]

I'm looking at porting a version of this to our fork and one thing I'm looking at is how this affects Flyte scheduler. Flyte scheduler is hardcoded to try and retry the execution creation up to 30 times if it fails, unless the gRPC status code in the error is codes.AlreadyExists. I think the scheduler code will need to be updated to either give up on ResourceExhausted or we'll need to a way to articulate this case through richer error details.

Makes sense - this change will increase ResourceExhausted errors noticeably, especially if measured as a metric and it can be misleading since it isn't really an error. Some changes on flytescheduler are probably needed. Although not implemented yet, I intend to also add the execution phase SKIPPED so that the UI can indicate skipped executions rather than throw resource error.

Hi @thomasjhuang @Sovietaced . Since I’m currently working on some scheduler related issues, I can go ahead and open an issue for this and submit a PR.

popojk avatar Aug 05 '25 03:08 popojk