flyte icon indicating copy to clipboard operation
flyte copied to clipboard

Change Flyte CR naming scheme to better support namespace_mapping

Open ddl-ebrown opened this issue 1 year ago • 5 comments

@ddl-rliu did most of the work on this one - making this an upstream PR as it resolved a real issue for us.

Tracking issue

Why are the changes needed?

  • Typically Flyte is configured so that each project / domain has its own Kubernetes namespace.

    Certain environments may change this behavior by using the Flyteadmin namespace_mapping setting to put all executions in fewer (or a singular) Kubernetes namespace. This is problematic because it can lead to collisions in the naming of the CR that flyteadmin generates.

What changes were proposed in this pull request?

  • This patch fixes 2 important things to make this work properly inside of Flyte:

    • it adds a random element to the CR name in Flyte so that the CR is named by the execution + some unique value when created by flyteadmin

      Without this change, an execution Foo in project A will prevent an execution Foo in project B from launching, because the name of the CR thats generated in Kubernetes assumes that the namespace the CRs are put into is different for project A and project B

      When namespace_mapping is set to a singular value, that assumption is wrong

    • it makes sure that when flytepropeller cleans up the CR resource that it uses Kubernetes labels to find the correct CR -- so instead of assuming that it can use the execution name, it instead uses the project, domain and execution labels

How was this patch tested?

This is deployed in a live Flyte setup where we have automated tests. We observed that the CR names were correctly unique after this and the initial collision no longer occurred.

Setup process

Screenshots

Check all the applicable boxes

  • [ ] I updated the documentation accordingly.
  • [ ] All new and existing tests passed.
  • [ ] All commits are signed-off.

Related PRs

Docs link

ddl-ebrown avatar Jun 14 '24 22:06 ddl-ebrown

Codecov Report

Attention: Patch coverage is 84.61538% with 6 lines in your changes missing coverage. Please review.

Project coverage is 58.49%. Comparing base (c3869f8) to head (318435c).

Files with missing lines Patch % Lines
...ropeller/pkg/compiler/transformers/k8s/workflow.go 75.00% 5 Missing and 1 partial :warning:
Additional details and impacted files
@@            Coverage Diff             @@
##           master    #5480      +/-   ##
==========================================
+ Coverage   58.47%   58.49%   +0.02%     
==========================================
  Files         940      940              
  Lines       71567    71602      +35     
==========================================
+ Hits        41852    41887      +35     
+ Misses      26531    26530       -1     
- Partials     3184     3185       +1     
Flag Coverage Δ
unittests-datacatalog 59.03% <ø> (ø)
unittests-flyteadmin 56.31% <100.00%> (+0.05%) :arrow_up:
unittests-flytecopilot 30.99% <ø> (ø)
unittests-flytectl 64.70% <ø> (ø)
unittests-flyteidl 76.12% <ø> (ø)
unittests-flyteplugins 60.89% <ø> (ø)
unittests-flytepropeller 54.84% <75.00%> (+0.02%) :arrow_up:
unittests-flytestdlib 64.04% <ø> (ø)

Flags with carried forward coverage won't be shown. Click here to find out more.

:umbrella: View full report in Codecov by Sentry.
:loudspeaker: Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

codecov[bot] avatar Jun 14 '24 22:06 codecov[bot]

I am not in favor of this, as randomness will lead to leaky workflows and duplicates. We should use the project id itself or generate a consistent hash to increase inter project execution entropy

kumare3 avatar Jun 25 '24 14:06 kumare3

I am not in favor of this, as randomness will lead to leaky workflows and duplicates. We should use the project id itself or generate a consistent hash to increase inter project execution entropy

Ah thanks @kumare3 for the heads up! We clearly didn't realize there was something internal to Flyte that depends on deterministic naming for CRs -- will make some updates taking that into account as well

ddl-ebrown avatar Jun 25 '24 15:06 ddl-ebrown

I am not in favor of this, as randomness will lead to leaky workflows and duplicates. We should use the project id itself or generate a consistent hash to increase inter project execution entropy

Ah thanks @kumare3 for the heads up! We clearly didn't realize there was something internal to Flyte that depends on deterministic naming for CRs -- will make some updates taking that into account as well

Also, should mention @kumare3 that if by "leaky" you meant "CR might not be deleted from the cluster", the deletion process is robust because this uses the actual key of the workflow in conjunction with CR labels to perform deletes, rather than the CR name.

If there are dupe CRs for the same workflow though, that's clearly an issue regardless :)

ddl-ebrown avatar Jun 25 '24 17:06 ddl-ebrown

@ddl-ebrown I agree with not introducing randomization... specially that the name already starts with a random string :-)

Instead, I would update this call to use something like project-domain-rand(10) and hash that and that becomes the execution name...

I would also make the length of the execution name configurable in flyteadmin. so in your deployment you can make it longer and give you better entropy...

EngHabu avatar Jun 25 '24 19:06 EngHabu

Cleaning stale PRs. Please reopen if you wan to discuss this further.

eapolinario avatar Mar 03 '25 19:03 eapolinario

FYI - We're still carrying a patch around this as the it's necessary to fix the underlying bug inside of Flyte. I would love to see this resolved upstream so that we don't need to carry the patch.

What do we think the path is to making that happen @eapolinario?

From deeaf0e25bf67d382fe0080afc4832de4970a084 Mon Sep 17 00:00:00 2001
From: Richard Liu <[email protected]>
Date: Thu, 13 Jun 2024 21:51:40 -0700
Subject: [PATCH] Change Flyte CR naming scheme to better support
 namespace_mapping

 - Typically Flyte is configured so that each project / domain has its
   own Kubernetes namespace.

   Certain environments may change this behavior by using the Flyteadmin
   namespace_mapping setting to put all executions in fewer (or a singular)
   Kubernetes namespace. This is problematic because it can lead to
   collisions in the naming of the CR that flyteadmin generates.

 - This patch fixes 2 important things to make this work properly inside
   of Flyte:

   * it adds a random element to the CR name in Flyte so that the CR is
     named by the execution + some unique value when created by
     flyteadmin

     Without this change, an execution Foo in project A will prevent an
     execution Foo in project B from launching, because the name of the
     CR thats generated in Kubernetes *assumes* that the namespace the
     CRs are put into is different for project A and project B

     When namespace_mapping is set to a singular value, that assumption
     is wrong

   * it makes sure that when flytepropeller cleans up the CR resource
     that it uses Kubernetes labels to find the correct CR -- so instead
     of assuming that it can use the execution name, it instead uses the
     project, domain and execution labels

 - Adds use-workflow-cr-name-suffix setting as a true / false value to
   determine whether to append a deterministic hash of execution id, name,
   project, as the FlyteWorkflow CR name. This uses a fixed length of 10
   which introduces enough entropy to prevent collisions in practical
   terms

Signed-off-by: ddl-ebrown <[email protected]>
Signed-off-by: ddl-rliu <[email protected]>
---
 .../pkg/workflowengine/impl/k8s_executor.go   | 28 +++++++++++--
 .../workflowengine/impl/k8s_executor_test.go  | 27 +++++++-----
 .../pkg/compiler/transformers/k8s/workflow.go | 37 +++++++++++++++-
 .../transformers/k8s/workflow_test.go         | 42 +++++++++++++++++++
 .../pkg/controller/config/config.go           |  2 +
 5 files changed, 122 insertions(+), 14 deletions(-)

diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go
index d941cc830..c7a73e983 100644
--- a/flyteadmin/pkg/workflowengine/impl/k8s_executor.go
+++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor.go
@@ -12,6 +12,7 @@ import (
 	execClusterInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/executioncluster/interfaces"
 	runtimeInterfaces "github.com/flyteorg/flyte/flyteadmin/pkg/runtime/interfaces"
 	"github.com/flyteorg/flyte/flyteadmin/pkg/workflowengine/interfaces"
+	"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
 	"github.com/flyteorg/flyte/flytestdlib/logger"
 	"github.com/flyteorg/flyte/flytestdlib/storage"
 )
@@ -87,6 +88,23 @@ func (e K8sWorkflowExecutor) Execute(ctx context.Context, data interfaces.Execut
 	}, nil
 }
 
+const (
+	// Labels that are set on the FlyteWorkflow CRD
+	DomainLabel      = "domain"
+	ExecutionIDLabel = "execution-id"
+	ProjectLabel     = "project"
+)
+
+func executionLabelSelector(executionID *core.WorkflowExecutionIdentifier) *v1.LabelSelector {
+	return &v1.LabelSelector{
+		MatchLabels: map[string]string{
+			DomainLabel:      executionID.GetDomain(),
+			ExecutionIDLabel: executionID.GetName(),
+			ProjectLabel:     executionID.GetProject(),
+		},
+	}
+}
+
 func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortData) error {
 	target, err := e.executionCluster.GetTarget(ctx, &executioncluster.ExecutionTargetSpec{
 		TargetID: data.Cluster,
@@ -94,9 +112,13 @@ func (e K8sWorkflowExecutor) Abort(ctx context.Context, data interfaces.AbortDat
 	if err != nil {
 		return errors.NewFlyteAdminErrorf(codes.Internal, err.Error())
 	}
-	err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).Delete(ctx, data.ExecutionID.GetName(), v1.DeleteOptions{
-		PropagationPolicy: &deletePropagationBackground,
-	})
+	err = target.FlyteClient.FlyteworkflowV1alpha1().FlyteWorkflows(data.Namespace).DeleteCollection(
+		ctx,
+		v1.DeleteOptions{PropagationPolicy: &deletePropagationBackground},
+		v1.ListOptions{
+			LabelSelector: v1.FormatLabelSelector(executionLabelSelector(data.ExecutionID)),
+		},
+	)
 	// An IsNotFound error indicates the resource is already deleted.
 	if err != nil && !k8_api_err.IsNotFound(err) {
 		return errors.NewFlyteAdminErrorf(codes.Internal, "failed to terminate execution: %v with err %v", data.ExecutionID, err)
diff --git a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
index a2ecb5136..bc3888667 100644
--- a/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
+++ b/flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
@@ -31,11 +31,11 @@ import (
 var fakeFlyteWF = FakeFlyteWorkflowV1alpha1{}
 
 type createCallback func(*v1alpha1.FlyteWorkflow, v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error)
-type deleteCallback func(name string, options *v1.DeleteOptions) error
+type deleteCollectionCallback func(*v1.DeleteOptions, *v1.ListOptions) error
 type FakeFlyteWorkflow struct {
 	v1alpha12.FlyteWorkflowInterface
-	createCallback createCallback
-	deleteCallback deleteCallback
+	createCallback
+	deleteCollectionCallback
 }
 
 func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkflow, opts v1.CreateOptions) (*v1alpha1.FlyteWorkflow, error) {
@@ -45,9 +45,9 @@ func (b *FakeFlyteWorkflow) Create(ctx context.Context, wf *v1alpha1.FlyteWorkfl
 	return nil, nil
 }
 
-func (b *FakeFlyteWorkflow) Delete(ctx context.Context, name string, options v1.DeleteOptions) error {
-	if b.deleteCallback != nil {
-		return b.deleteCallback(name, &options)
+func (b *FakeFlyteWorkflow) DeleteCollection(ctx context.Context, opts v1.DeleteOptions, listOpts v1.ListOptions) error {
+	if b.deleteCollectionCallback != nil {
+		return b.deleteCollectionCallback(&opts, &listOpts)
 	}
 	return nil
 }
@@ -280,8 +280,15 @@ func TestExecute_MiscError(t *testing.T) {
 
 func TestAbort(t *testing.T) {
 	fakeFlyteWorkflow := FakeFlyteWorkflow{}
-	fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
-		assert.Equal(t, execID.Name, name)
+	fakeFlyteWorkflow.deleteCollectionCallback = func(options *v1.DeleteOptions, listOpts *v1.ListOptions) error {
+		selector := v1.FormatLabelSelector(&v1.LabelSelector{
+			MatchLabels: map[string]string{
+				DomainLabel:      execID.GetDomain(),
+				ExecutionIDLabel: execID.GetName(),
+				ProjectLabel:     execID.GetProject(),
+			},
+		})
+		assert.Equal(t, selector, listOpts.LabelSelector)
 		assert.Equal(t, options.PropagationPolicy, &deletePropagationBackground)
 		return nil
 	}
@@ -302,7 +309,7 @@ func TestAbort(t *testing.T) {
 
 func TestAbort_Notfound(t *testing.T) {
 	fakeFlyteWorkflow := FakeFlyteWorkflow{}
-	fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
+	fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
 		return k8_api_err.NewNotFound(schema.GroupResource{
 			Group:    "foo",
 			Resource: "bar",
@@ -325,7 +332,7 @@ func TestAbort_Notfound(t *testing.T) {
 
 func TestAbort_MiscError(t *testing.T) {
 	fakeFlyteWorkflow := FakeFlyteWorkflow{}
-	fakeFlyteWorkflow.deleteCallback = func(name string, options *v1.DeleteOptions) error {
+	fakeFlyteWorkflow.deleteCollectionCallback = func(*v1.DeleteOptions, *v1.ListOptions) error {
 		return errors.New("call failed")
 	}
 	fakeFlyteWF.flyteWorkflowsCallback = func(ns string) v1alpha12.FlyteWorkflowInterface {
diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go
index 2421ddf9b..73e518c13 100644
--- a/flytepropeller/pkg/compiler/transformers/k8s/workflow.go
+++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow.go
@@ -2,17 +2,21 @@
 package k8s
 
 import (
+	"context"
 	"fmt"
 	"hash/fnv"
 	"strings"
 
 	v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
+	"k8s.io/apimachinery/pkg/util/rand"
 
 	"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/apis/flyteworkflow/v1alpha1"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
+	"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/utils"
+	"github.com/flyteorg/flyte/flytestdlib/logger"
 )
 
 const (
@@ -30,6 +34,11 @@ const (
 	ShardKeyLabel = "shard-key"
 	// The fully qualified FlyteWorkflow name
 	WorkflowNameLabel = "workflow-name"
+
+	// Length of hash to use as a suffix on the workflow CR name. Used when config.UseWorkflowCRNameSuffix is true.
+	// The workflow CR name should be at or under 63 characters long, here it is 52 + 1 + 10 = 63
+	workflowCRNameHashLength = 10
+	workflowCRNameSuffixFmt  = "%.52s-%s"
 )
 
 func requiresInputs(w *core.WorkflowTemplate) bool {
@@ -159,6 +168,20 @@ func generateName(wfID *core.Identifier, execID *core.WorkflowExecutionIdentifie
 	}
 }
 
+func hashIdentifier(identifier core.Identifier) uint64 {
+	h := fnv.New64()
+	_, err := h.Write([]byte(fmt.Sprintf("%s:%s:%s",
+		identifier.Project, identifier.Domain, identifier.Name)))
+	if err != nil {
+		// This shouldn't occur.
+		logger.Errorf(context.Background(),
+			"failed to hash execution identifier: %+v with err: %v", identifier, err)
+		return 0
+	}
+	logger.Debugf(context.Background(), "Returning hash for [%+v]: %d", identifier, h.Sum64())
+	return h.Sum64()
+}
+
 // BuildFlyteWorkflow builds v1alpha1.FlyteWorkflow resource. Returned error, if not nil, is of type errors.CompilerErrors.
 func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.LiteralMap,
 	executionID *core.WorkflowExecutionIdentifier, namespace string) (*v1alpha1.FlyteWorkflow, error) {
@@ -231,7 +254,19 @@ func BuildFlyteWorkflow(wfClosure *core.CompiledWorkflowClosure, inputs *core.Li
 		errs.Collect(errors.NewWorkflowBuildError(err))
 	}
 
-	obj.ObjectMeta.Name = name
+	if config.GetConfig().UseWorkflowCRNameSuffix {
+		// Seed the randomness before generating the name with random suffix
+		hashedIdentifier := hashIdentifier(core.Identifier{
+			Project: project,
+			Domain:  domain,
+			Name:    name,
+		})
+		rand.Seed(int64(hashedIdentifier))
+		obj.ObjectMeta.Name = fmt.Sprintf(workflowCRNameSuffixFmt, name, rand.String(workflowCRNameHashLength))
+	} else {
+		obj.ObjectMeta.Name = name
+	}
+
 	obj.ObjectMeta.GenerateName = generatedName
 	obj.ObjectMeta.Labels[ExecutionIDLabel] = label
 	obj.ObjectMeta.Labels[ProjectLabel] = project
diff --git a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
index dbb51e25e..893f81119 100644
--- a/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
+++ b/flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
@@ -11,6 +11,7 @@ import (
 	"github.com/flyteorg/flyte/flyteidl/gen/pb-go/flyteidl/core"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/common"
 	"github.com/flyteorg/flyte/flytepropeller/pkg/compiler/errors"
+	"github.com/flyteorg/flyte/flytepropeller/pkg/controller/config"
 	"github.com/flyteorg/flyte/flytestdlib/utils"
 )
 
@@ -251,6 +252,47 @@ func TestBuildFlyteWorkflow_withUnionInputs(t *testing.T) {
 	assert.Equal(t, "hello", wf.Inputs.Literals["y"].GetScalar().GetUnion().GetValue().GetScalar().GetPrimitive().GetStringValue())
 }
 
+func TestBuildFlyteWorkflow_setWorkflowCRNameHashLength(t *testing.T) {
+	for name, tt := range map[string]struct {
+		useSuffix bool
+		expected  string
+	}{
+		"default does not use hash as workflow CR name": {
+			useSuffix: false,
+			expected:  "",
+		},
+		"use hash as workflow CR name": {
+			useSuffix: true,
+			expected:  "-x6m7gswrdl",
+		},
+	} {
+		t.Run(name, func(t *testing.T) {
+			flyteConfig := config.GetConfig()
+			flyteConfig.UseWorkflowCRNameSuffix = tt.useSuffix
+
+			w := createSampleMockWorkflow()
+
+			errors.SetConfig(errors.Config{IncludeSource: true})
+			wf, err := BuildFlyteWorkflow(
+				&core.CompiledWorkflowClosure{
+					Primary: w.GetCoreWorkflow(),
+					Tasks: []*core.CompiledTask{
+						{
+							Template: &core.TaskTemplate{
+								Id: &core.Identifier{Name: "ref_1"},
+							},
+						},
+					},
+				},
+				nil, nil, "")
+			assert.Equal(t, tt.expected, wf.ObjectMeta.Name)
+			assert.NoError(t, err)
+			assert.NotNil(t, wf)
+			errors.SetConfig(errors.Config{})
+		})
+	}
+}
+
 func TestGenerateName(t *testing.T) {
 	t.Run("Invalid params", func(t *testing.T) {
 		_, _, _, _, _, err := generateName(nil, nil)
diff --git a/flytepropeller/pkg/controller/config/config.go b/flytepropeller/pkg/controller/config/config.go
index a0217e186..7700bc873 100644
--- a/flytepropeller/pkg/controller/config/config.go
+++ b/flytepropeller/pkg/controller/config/config.go
@@ -120,6 +120,7 @@ var (
 			EventVersion:               0,
 			DefaultParallelismBehavior: ParallelismBehaviorUnlimited,
 		},
+		UseWorkflowCRNameSuffix: false,
 	}
 )
 
@@ -161,6 +162,7 @@ type Config struct {
 	NodeExecutionWorkerCount int                     `json:"node-execution-worker-count" pflag:",Number of workers to evaluate node executions, currently only used for array nodes"`
 	ArrayNode                ArrayNodeConfig         `json:"array-node-config,omitempty" pflag:",Configuration for array nodes"`
	LiteralOffloadingConfig  LiteralOffloadingConfig `json:"literal-offloading-config" pflag:",config used for literal offloading."`
+	UseWorkflowCRNameSuffix  bool                    `json:"use-workflow-cr-name-suffix" pflag:",If false, the execution ID will be used as the workflow CR name. Otherwise, a hash of the execution ID, project, domain will be used as a suffix on the CR name."`
 }
 
 // KubeClientConfig contains the configuration used by flytepropeller to configure its internal Kubernetes Client.
-- 
2.45.2

ddl-ebrown avatar Mar 04 '25 21:03 ddl-ebrown

Code Review Agent Run #547877

Actionable Suggestions - 1
  • flytepropeller/pkg/compiler/transformers/k8s/workflow.go - 1
    • Consider using thread-safe random generation · Line 264-265
Review Details
  • Files reviewed - 5 · Commit Range: 318435c..318435c
    • flyteadmin/pkg/workflowengine/impl/k8s_executor.go
    • flyteadmin/pkg/workflowengine/impl/k8s_executor_test.go
    • flytepropeller/pkg/compiler/transformers/k8s/workflow.go
    • flytepropeller/pkg/compiler/transformers/k8s/workflow_test.go
    • flytepropeller/pkg/controller/config/config.go
  • Files skipped - 0
  • Tools
    • Whispers (Secret Scanner) - ✔︎ Successful
    • Detect-secrets (Secret Scanner) - ✔︎ Successful

Bito Usage Guide

Commands

Type the following command in the pull request comment and save the comment.

  • /review - Manually triggers a full AI review.

Refer to the documentation for additional commands.

Configuration

This repository uses code_review_bito You can customize the agent settings here or contact your Bito workspace admin at [email protected].

Documentation & Help

AI Code Review powered by Bito Logo

flyte-bot avatar Mar 20 '25 17:03 flyte-bot

Changelist by Bito

This pull request implements the following key changes.

Key Change Files Impacted
Bug Fix - Correct CR Deletion with Label Selector

k8s_executor.go - Updated deletion logic in Abort to call DeleteCollection using a label selector, ensuring proper cleanup of FlyteWorkflows in shared namespaces.

Feature Improvement - Enhanced Workflow CR Naming Scheme

workflow.go - Introduced a hash-based random suffix during workflow CR name generation to avoid naming collisions.

config.go - Added a configuration flag (UseWorkflowCRNameSuffix) to toggle the use of a hashed suffix for CR names based on the execution identifier.

Testing - Tests Adjustments for CR Naming and Deletion

k8s_executor_test.go - Modified test callback types and assertions to support the new DeleteCollection API for CR deletion.

workflow_test.go - Implemented tests to verify behavior of the workflow CR naming logic under different configuration settings.

flyte-bot avatar Mar 20 '25 17:03 flyte-bot