Change Flyte CR naming scheme to better support namespace_mapping
@ddl-rliu did most of the work on this one - making this an upstream PR as it resolved a real issue for us.
Tracking issue
Why are the changes needed?
-
Typically Flyte is configured so that each project / domain has its own Kubernetes namespace.
Certain environments may change this behavior by using the Flyteadmin namespace_mapping setting to put all executions in fewer (or a singular) Kubernetes namespace. This is problematic because it can lead to collisions in the naming of the CR that flyteadmin generates.
What changes were proposed in this pull request?
-
This patch fixes 2 important things to make this work properly inside of Flyte:
-
it adds a random element to the CR name in Flyte so that the CR is named by the execution + some unique value when created by flyteadmin
Without this change, an execution Foo in project A will prevent an execution Foo in project B from launching, because the name of the CR thats generated in Kubernetes assumes that the namespace the CRs are put into is different for project A and project B
When namespace_mapping is set to a singular value, that assumption is wrong
-
it makes sure that when flytepropeller cleans up the CR resource that it uses Kubernetes labels to find the correct CR -- so instead of assuming that it can use the execution name, it instead uses the project, domain and execution labels
-
How was this patch tested?
This is deployed in a live Flyte setup where we have automated tests. We observed that the CR names were correctly unique after this and the initial collision no longer occurred.
Setup process
Screenshots
Check all the applicable boxes
- [ ] I updated the documentation accordingly.
- [ ] All new and existing tests passed.
- [ ] All commits are signed-off.
Related PRs
Docs link
Codecov Report
Attention: Patch coverage is 84.61538% with 6 lines in your changes missing coverage. Please review.
Project coverage is 58.49%. Comparing base (
c3869f8) to head (318435c).
| Files with missing lines | Patch % | Lines |
|---|---|---|
| ...ropeller/pkg/compiler/transformers/k8s/workflow.go | 75.00% | 5 Missing and 1 partial :warning: |
Additional details and impacted files
@@ Coverage Diff @@
## master #5480 +/- ##
==========================================
+ Coverage 58.47% 58.49% +0.02%
==========================================
Files 940 940
Lines 71567 71602 +35
==========================================
+ Hits 41852 41887 +35
+ Misses 26531 26530 -1
- Partials 3184 3185 +1
| Flag | Coverage Δ | |
|---|---|---|
| unittests-datacatalog | 59.03% <ø> (ø) |
|
| unittests-flyteadmin | 56.31% <100.00%> (+0.05%) |
:arrow_up: |
| unittests-flytecopilot | 30.99% <ø> (ø) |
|
| unittests-flytectl | 64.70% <ø> (ø) |
|
| unittests-flyteidl | 76.12% <ø> (ø) |
|
| unittests-flyteplugins | 60.89% <ø> (ø) |
|
| unittests-flytepropeller | 54.84% <75.00%> (+0.02%) |
:arrow_up: |
| unittests-flytestdlib | 64.04% <ø> (ø) |
Flags with carried forward coverage won't be shown. Click here to find out more.
:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.
🚀 New features to boost your workflow:
- ❄ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.
I am not in favor of this, as randomness will lead to leaky workflows and duplicates. We should use the project id itself or generate a consistent hash to increase inter project execution entropy
I am not in favor of this, as randomness will lead to leaky workflows and duplicates. We should use the project id itself or generate a consistent hash to increase inter project execution entropy
Ah thanks @kumare3 for the heads up! We clearly didn't realize there was something internal to Flyte that depends on deterministic naming for CRs -- will make some updates taking that into account as well
I am not in favor of this, as randomness will lead to leaky workflows and duplicates. We should use the project id itself or generate a consistent hash to increase inter project execution entropy
Ah thanks @kumare3 for the heads up! We clearly didn't realize there was something internal to Flyte that depends on deterministic naming for CRs -- will make some updates taking that into account as well
Also, should mention @kumare3 that if by "leaky" you meant "CR might not be deleted from the cluster", the deletion process is robust because this uses the actual key of the workflow in conjunction with CR labels to perform deletes, rather than the CR name.
If there are dupe CRs for the same workflow though, that's clearly an issue regardless :)
@ddl-ebrown I agree with not introducing randomization... specially that the name already starts with a random string :-)
Instead, I would update this call to use something like project-domain-rand(10) and hash that and that becomes the execution name...
I would also make the length of the execution name configurable in flyteadmin. so in your deployment you can make it longer and give you better entropy...
Cleaning stale PRs. Please reopen if you wan to discuss this further.
FYI - We're still carrying a patch around this as the it's necessary to fix the underlying bug inside of Flyte. I would love to see this resolved upstream so that we don't need to carry the patch.
What do we think the path is to making that happen @eapolinario?
From deeaf0e25bf67d382fe0080afc4832de4970a084 Mon Sep 17 00:00:00 2001
From: Richard Liu <[email protected]>
Date: Thu, 13 Jun 2024 21:51:40 -0700
Subject: [PATCH] Change Flyte CR naming scheme to better support
namespace_mapping
- Typically Flyte is configured so that each project / domain has its
own Kubernetes namespace.
Certain environments may change this behavior by using the Flyteadmin
namespace_mapping setting to put all executions in fewer (or a singular)
Kubernetes namespace. This is problematic because it can lead to
collisions in the naming of the CR that flyteadmin generates.
- This patch fixes 2 important things to make this work properly inside
of Flyte:
* it adds a random element to the CR name in Flyte so that the CR is
named by the execution + some unique value when created by
flyteadmin
Without this change, an execution Foo in project A will prevent an
execution Foo in project B from launching, because the name of the
CR thats generated in Kubernetes *assumes* that the namespace the
CRs are put into is different for project A and project B
When namespace_mapping is set to a singular value, that assumption
is wrong
* it makes sure that when flytepropeller cleans up the CR resource
that it uses Kubernetes labels to find the correct CR -- so instead
of assuming that it can use the execution name, it instead uses the
project, domain and execution labels
- Adds use-workflow-cr-name-suffix setting as a true / false value to
determine whether to append a deterministic hash of execution id, name,
project, as the FlyteWorkflow CR name. This uses a fixed length of 10
which introduces enough entropy to prevent collisions in practical
terms
Signed-off-by: ddl-ebrown <[email protected]>
Signed-off-by: ddl-rliu <[email protected]>
---
.../pkg/workflowengine/impl/k8s_executor.go | 28 +++++++++++--
.../workflowengine/impl/k8s_executor_test.go | 27 +++++++-----
.../pkg/compiler/transformers/k8s/workflow.go | 37 +++++++++++++++-
.../transformers/k8s/workflow_test.go | 42 +++++++++++++++++++
.../pkg/controller/config/config.go | 2 +
5 files changed, 122 insertions(+), 14 deletions(-)
diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go
index d941cc830..c7a73e983 100644
--- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go
+++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go
@@ -12,6 +12,7 @@ import (
execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces"
runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
+ "github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytestdlib/logger"
"github.com/flyteorg/flyte/flytestdlib/storage"
)
@@ -87,6 +88,23 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
}, nil
}
+const (
+ // Labels that are set on the FlyteWorkflow CRD
+ DomainLabel = "domain"
+ ExecutionIDLabel = "execution-id"
+ ProjectLabel = "project"
+)
+
+func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector {
+ return &v1.LabelSelector{
+ MatchLabels: map[string]string{
+ DomainLabel: executionID.GetDomain(),
+ ExecutionIDLabel: executionID.GetName(),
+ ProjectLabel: executionID.GetProject(),
+ },
+ }
+}
+
func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error {
target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{
TargetID: data.Cluster,
@@ -94,9 +112,13 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat
if err != nil {
return errors.NewFlyteAdminErrorf(codes.Internal, err.Error())
}
- err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{
- PropagationPolicy: &deletePropagationBackground,
- })
+ err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection(
+ ctx,
+ v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground},
+ v1.ListOptions{
+ LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)),
+ },
+ )
// An IsNotFound error indicates the resource is already deleted.
if err != nil && !k8_api_err.IsNotFound(err) {
return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
index a2ecb5136..bc3888667 100644
--- a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
+++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
@@ -31,11 +31,11 @@ import (
var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{}
type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error)
-type deleteCallback func(name string, options *v1.DeleteOptions) error
+type deleteCollectionCallback func(*v1.DeleteOptions, *v1.ListOptions) error
type FakeFlyteWorkflow struct {
v1alpha12.FlyteWorkflowInterface
- createCallback createCallback
- deleteCallback deleteCallback
+ createCallback
+ deleteCollectionCallback
}
func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
@@ -45,9 +45,9 @@ func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkfl
return nil, nil
}
-func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.DeleteOptions) error {
- if b.deleteCallback != nil {
- return b.deleteCallback(name, &options)
+func (b *FakeFlyteWorkflow) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error {
+ if b.deleteCollectionCallback != nil {
+ return b.deleteCollectionCallback(&opts, &listOpts)
}
return nil
}
@@ -280,8 +280,15 @@ func TestExecute_MiscError(t *testing.T) {
func TestAbort(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
- fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
- assert.Equal(t, execID.Name, name)
+ fakeFlyteWorkflow.deleteCollectionCallback = func(options *v1.DeleteOptions, listOpts *v1.ListOptions) error {
+ selector := v1.FormatLabelSelector(&v1.LabelSelector{
+ MatchLabels: map[string]string{
+ DomainLabel: execID.GetDomain(),
+ ExecutionIDLabel: execID.GetName(),
+ ProjectLabel: execID.GetProject(),
+ },
+ })
+ assert.Equal(t, selector, listOpts.LabelSelector)
assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground)
return nil
}
@@ -302,7 +309,7 @@ func TestAbort(t *testing.T) {
func TestAbort_Notfound(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
- fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
+ fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
return k8_api_err.NewNotFound(schema.GroupResource{
Group: "foo",
Resource: "bar",
@@ -325,7 +332,7 @@ func TestAbort_Notfound(t *testing.T) {
func TestAbort_MiscError(t *testing.T) {
fakeFlyteWorkflow := FakeFlyteWorkflow{}
- fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
+ fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
return errors.New("call failed")
}
fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go
index 2421ddf9b..73e518c13 100644
--- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go
+++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go
@@ -2,17 +2,21 @@
package k8s
import (
+ "context"
"fmt"
"hash/fnv"
"strings"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+ "k8s.io/apimachinery/pkg/util/rand"
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
+ "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytepropeller/pkg/utils"
+ "github.com/flyteorg/flyte/flytestdlib/logger"
)
const (
@@ -30,6 +34,11 @@ const (
ShardKeyLabel = "shard-key"
// The fully qualified FlyteWorkflow name
WorkflowNameLabel = "workflow-name"
+
+ // Length of hash to use as a suffix on the workflow CR name. Used when config.UseWorkflowCRNameSuffix is true.
+ // The workflow CR name should be at or under 63 characters long, here it is 52 + 1 + 10 = 63
+ workflowCRNameHashLength = 10
+ workflowCRNameSuffixFmt = "%.52s-%s"
)
func requiresInputs(w *core.WorkflowTemplate) bool {
@@ -159,6 +168,20 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie
}
}
+func hashIdentifier(identifier core.Identifier) uint64 {
+ h := fnv.New64()
+ _, err := h.Write([]byte(fmt.Sprintf("%s:%s:%s",
+ identifier.Project, identifier.Domain, identifier.Name)))
+ if err != nil {
+ // This shouldn't occur.
+ logger.Errorf(context.Background(),
+ "failed to hash execution identifier: %+v with err: %v", identifier, err)
+ return 0
+ }
+ logger.Debugf(context.Background(), "Returning hash for [%+v]: %d", identifier, h.Sum64())
+ return h.Sum64()
+}
+
// BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors.
func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap,
executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) {
@@ -231,7 +254,19 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
errs.Collect(errors.NewWorkflowBuildError(err))
}
- obj.ObjectMeta.Name = name
+ if config.GetConfig().UseWorkflowCRNameSuffix {
+ // Seed the randomness before generating the name with random suffix
+ hashedIdentifier := hashIdentifier(core.Identifier{
+ Project: project,
+ Domain: domain,
+ Name: name,
+ })
+ rand.Seed(int64(hashedIdentifier))
+ obj.ObjectMeta.Name = fmt.Sprintf(workflowCRNameSuffixFmt, name, rand.String(workflowCRNameHashLength))
+ } else {
+ obj.ObjectMeta.Name = name
+ }
+
obj.ObjectMeta.GenerateName = generatedName
obj.ObjectMeta.Labels[ExecutionIDLabel] = label
obj.ObjectMeta.Labels[ProjectLabel] = project
diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
index dbb51e25e..893f81119 100644
--- a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
+++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
@@ -11,6 +11,7 @@ import (
"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
+ "github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
"github.com/flyteorg/flyte/flytestdlib/utils"
)
@@ -251,6 +252,47 @@ func TestBuildFlyteWorkflow_withUnionInputs(t *testing.T) {
assert.Equal(t, "hello", wf.Inputs.Literals["y"].GetScalar().GetUnion().GetValue().GetScalar().GetPrimitive().GetStringValue())
}
+func TestBuildFlyteWorkflow_setWorkflowCRNameHashLength(t *testing.T) {
+ for name, tt := range map[string]struct {
+ useSuffix bool
+ expected string
+ }{
+ "default does not use hash as workflow CR name": {
+ useSuffix: false,
+ expected: "",
+ },
+ "use hash as workflow CR name": {
+ useSuffix: true,
+ expected: "-x6m7gswrdl",
+ },
+ } {
+ t.Run(name, func(t *testing.T) {
+ flyteConfig := config.GetConfig()
+ flyteConfig.UseWorkflowCRNameSuffix = tt.useSuffix
+
+ w := createSampleMockWorkflow()
+
+ errors.SetConfig(errors.Config{IncludeSource: true})
+ wf, err := BuildFlyteWorkflow(
+ &core.CompiledWorkflowClosure{
+ Primary: w.GetCoreWorkflow(),
+ Tasks: []*core.CompiledTask{
+ {
+ Template: &core.TaskTemplate{
+ Id: &core.Identifier{Name: "ref_1"},
+ },
+ },
+ },
+ },
+ nil, nil, "")
+ assert.Equal(t, tt.expected, wf.ObjectMeta.Name)
+ assert.NoError(t, err)
+ assert.NotNil(t, wf)
+ errors.SetConfig(errors.Config{})
+ })
+ }
+}
+
func TestGenerateName(t *testing.T) {
t.Run("Invalid params", func(t *testing.T) {
_, _, _, _, _, err := generateName(nil, nil)
diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go
index a0217e186..7700bc873 100644
--- a/flytepropeller/pkg/controller/config/config.go
+++ b/flytepropeller/pkg/controller/config/config.go
@@ -120,6 +120,7 @@ var (
EventVersion: 0,
DefaultParallelismBehavior: ParallelismBehaviorUnlimited,
},
+ UseWorkflowCRNameSuffix: false,
}
)
@@ -161,6 +162,7 @@ type Config struct {
NodeExecutionWorkerCount int `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"`
ArrayNode ArrayNodeConfig `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"`
LiteralOffloadingConfig LiteralOffloadingConfig `json:"literal-offloading-config" pflag:",config used for literal offloading."`
+ UseWorkflowCRNameSuffix bool `json:"use-workflow-cr-name-suffix" pflag:",If false, the execution ID will be used as the workflow CR name. Otherwise, a hash of the execution ID, project, domain will be used as a suffix on the CR name."`
}
// KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
--
2.45.2
Code Review Agent Run #547877
Actionable Suggestions - 1
-
flytepropeller/pkg/compiler/transformers/k8s/workflow.go - 1
- Consider using thread-safe random generation · Line 264-265
Review Details
-
Files reviewed - 5 · Commit Range:
318435c..318435c- flyteadmin/pkg/workflowengine/impl/k8s_executor.go
- flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
- flytepropeller/pkg/compiler/transformers/k8s/workflow.go
- flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
- flytepropeller/pkg/controller/config/config.go
-
Files skipped - 0
-
Tools
- Whispers (Secret Scanner) - ✔︎ Successful
- Detect-secrets (Secret Scanner) - ✔︎ Successful
Bito Usage Guide
Commands
Type the following command in the pull request comment and save the comment.
/review- Manually triggers a full AI review.
Refer to the documentation for additional commands.
Configuration
This repository uses code_review_bito You can customize the agent settings here or contact your Bito workspace admin at [email protected].
Documentation & Help
Changelist by Bito
This pull request implements the following key changes.
| Key Change | Files Impacted |
|---|---|
| Bug Fix - Correct CR Deletion with Label Selector |
- |
| Feature Improvement - Enhanced Workflow CR Naming Scheme |
- - |
| Testing - Tests Adjustments for CR Naming and Deletion |
- - |