feat(backend): Replace MLMD with KFP Server APIs
Description of your changes:
This PR removes MLMD as per the KEP here
Resolves: https://github.com/kubeflow/pipelines/issues/11760
Overview
Core Change: Replaced MLMD (ML Metadata) service with direct database storage via KFP API server.
This is a major architectural shift that eliminates the external ML Metadata service dependency and consolidates all artifact and task metadata operations directly into the KFP API server with MySQL/database backend.
NOTE: Migration and UI changes will follow this PR. UI will be in a broken state until then. The UI change is a blocker to merge the mlmd-removal branch to master.
Components Removed
MLMD Service Infrastructure
- metadata-writer component (
backend/metadata_writer/)- Python-based service that wrote execution metadata to MLMD
- Dockerfile and all source code removed
- metadata-grpc deployment
- MLMD gRPC service and envoy proxy
- All Kustomize manifests and configurations removed
- DNS configuration patches removed from all deployment variants
- MLMD Client Library (
backend/src/v2/metadata/)- ~1,800 lines of Go client code removed
- Client, converter, and test utilities deleted
Deployment Changes
- Removed from all Kustomization variants (standalone, multiuser, kubernetes-native)
- Removed metadata-writer from CI image builds
- Removed metadata service from proxy NO_PROXY configurations
- Removed metadata-grpc port forwarding from integration test workflows
Components Added
New API Layer
Artifact Service API (backend/api/v2beta1/artifact.proto)
-
CRUD Operations:
CreateArtifact- Create single artifactGetArtifact- Retrieve artifact by IDListArtifacts- Query artifacts with filteringBatchCreateArtifacts- Bulk artifact creation
-
Artifact Task Operations:
CreateArtifactTask- Track artifact usage in tasksListArtifactTasks- Query artifact-task relationshipsBatchCreateArtifactTasks- Bulk task-artifact linking
-
Generated Clients:
- Go HTTP client (~4,000 lines)
- Python HTTP client (~3,500 lines)
- Swagger documentation
Extended Run Service API (backend/api/v2beta1/run.proto)
-
New Task Endpoints:
CreateTask- Create pipeline task execution recordGetTask- Retrieve task detailsListTasks- Query tasks with filteringUpdateTask- Update task status/metadataBatchUpdateTasks- Efficient bulk task updates
-
ViewMode Feature:
BASIC- Minimal response (IDs, status, timestamps)RUNTIME_ONLY- Include runtime details without full specFULL- Complete task/run details with spec- Reduces payload size for list operations by 80%+
Storage Layer
Artifact Storage (backend/src/apiserver/storage/artifact_store.go)
- Direct MySQL table for artifacts
- Stores: name, URI, type, metadata, custom properties
- Supports filtering by run_id, task_name, artifact_type
- ~300 lines with comprehensive test coverage
Artifact Task Store (backend/src/apiserver/storage/artifact_task_store.go)
- Junction table linking artifacts to tasks
- Tracks: IO type (input/output), producer task, artifact metadata
- Bulk insert optimization for batch operations
- ~400 lines with test coverage
Enhanced Task Store (backend/src/apiserver/storage/task_store.go)
- Expanded from ~500 to ~1,400 lines
- Added task state tracking (PENDING, RUNNING, SUCCEEDED, FAILED, etc.)
- Input/output artifact and parameter tracking
- Pod information (name, namespace, type)
- Batch update support for efficient status synchronization
API Server Implementation
Artifact Server (backend/src/apiserver/server/artifact_server.go)
- Implements all artifact service endpoints
- Request validation and conversion
- Pagination support for list operations
- ~600 lines with 1,000+ lines of tests
Extended Run Server (backend/src/apiserver/server/run_server.go)
- Added task CRUD operation handlers
- ViewMode implementation for optimized responses
- Batch task update endpoint
- ~350 lines of new code with comprehensive tests
Client Infrastructure
KFP API Client (backend/src/v2/apiclient/)
- New client package for driver/launcher to call API server
- OAuth2/OIDC authentication support
- Retry logic and error handling
- Mock implementation for testing
- ~800 lines total
Driver/Launcher Refactoring
Parameter/Artifact Resolution (backend/src/v2/driver/resolver/)
- Extracted resolution logic from monolithic
resolve.go(~1,100 lines removed) - New focused modules:
parameters.go- Parameter resolution (~560 lines)artifacts.go- Artifact resolution (~314 lines)resolve.go- Orchestration (~90 lines)
- Improved testability and maintainability
Driver Changes (backend/src/v2/driver/)
- Removed MLMD client dependency
- Added KFP API client for task/artifact operations
- Refactored execution flow to use API server
- Container/DAG execution updated for new storage model
Launcher Changes (backend/src/v2/cmd/launcher-v2/)
- Replaced MLMD calls with API server calls
- Uses batch updater for efficient status reporting
- Artifact publishing through artifact API
Batch Updater (backend/src/v2/component/batch_updater.go)
- Efficient batching mechanism for task updates
- Reduces API calls during execution
- Configurable batch size and flush intervals
- ~250 lines with interfaces for testing
Testing Infrastructure
Test Data Pipelines (backend/src/v2/driver/test_data/)
- 15+ new compiled pipeline YAMLs for integration testing:
cache_test.yaml- Cache hit/miss scenarioscomponentInput.yaml- Input parameter testingk8s_parameters.yaml- Kubernetes-specific featuresoneof_simple.yaml- Conditional executionnested_naming_conflicts.yaml- Name resolution edge cases- Loop iteration scenarios
- Optional input handling
- And more...
Test Coverage
- Storage layer: ~650 lines of tests for artifact/task stores
- API server: ~1,700 lines of tests for artifact/run servers
- Driver: ~1,400 lines of new integration tests
- Setup utilities: ~900 lines of test infrastructure
Utility Additions
Scope Path (backend/src/common/util/scope_path.go)
- Hierarchical DAG navigation for nested pipelines
- Tracks execution context through task hierarchy
- Used for parameter/artifact resolution
- ~230 lines with tests
Proto Helpers (backend/src/common/util/proto_helpers.go)
- Conversion utilities for proto messages
- Type-safe helpers for common operations
- ~44 lines
YAML Parser (backend/src/common/util/yaml_parser.go)
- Pipeline spec parsing utilities
- ~108 lines
Key Behavioral Changes
Artifact Tracking
- Before: Driver writes to MLMD via gRPC, launcher writes execution metadata via metadata-writer
- After: Driver/launcher call artifact API endpoints directly, writes to MySQL
Task State Management
- Before: State inferred from MLMD execution contexts
- After: Explicit task records with status, pod info, I/O tracking in task_store
Performance Optimizations
- ViewMode: List operations can request minimal data, reducing response size dramatically
- Batch Updates: Task status updates batched to reduce API overhead
- Direct DB Access: Eliminates gRPC hop to separate MLMD service
API Response Size
ListRunswithVIEW_MODE=DEFAULT: ~80% smaller payloads- Improves UI responsiveness for pipeline listing
Migration Considerations
Database Schema
- New tables:
artifacts,artifact_tasks - Extended
taskstable with new columns - Proto test golden files updated to reflect new response formats
Backwards Compatibility
- API endpoints maintain backward compatibility
- Existing pipeline specs continue to work
- No changes required to user-facing SDK
Deployment
- Simpler deployment (2 fewer services)
- Reduced resource requirements (no metadata-grpc, metadata-writer pods)
- Fewer network policies needed
Testing Strategy
Unit Tests
- Comprehensive coverage for all new storage/server components
- Mock implementations for API client
- Isolated testing of resolver logic
Integration Tests
- 15+ compiled test pipelines covering edge cases
- Driver integration tests with real Kubernetes API server
- Task/artifact lifecycle validation
Golden File Updates
- Proto test golden files regenerated
- Reflects new API response structure
Files Changed Summary
- Total files changed: ~550
- Lines added: ~50,000
- Lines removed: ~15,000
- Net addition: ~35,000 (mostly generated client code and tests)
Breakdown
- Generated API clients (Go/Python): ~15,000 lines
- Test code and test data: ~10,000 lines
- Storage layer implementation: ~2,000 lines
- API server implementation: ~1,500 lines
- Driver/launcher refactoring: ~1,000 lines
- Removed MLMD code: ~15,000 lines
Risks & Considerations
Testing
- Extensive test coverage added
- Integration tests validate end-to-end flows
- Proto compatibility tests ensure API stability
Performance
- Direct database access should be faster than gRPC → MLMD → DB
- Batch updates reduce API call overhead
- ViewMode optimization for large lists
Operational
- Simpler deployment reduces operational complexity
- Fewer moving parts = fewer failure modes
- All metadata operations auditable through API server logs
Recommended Follow-up
- Monitor database performance under load with new artifact tables
- Consider adding database indexes if artifact queries become slow
- Document migration path for existing MLMD data (if applicable)
- Update deployment documentation to reflect MLMD removal
- Performance benchmarking comparing MLMD vs. direct storage
Conclusion
This is an architectural improvement that:
- Reduces system complexity
- Improves maintainability
- Maintains API compatibility
- Includes comprehensive testing
- Simplifies deployment
Checklist:
- [x] You have signed off your commits
- [x] The title for your pull request (PR) should follow our title convention. Learn more about the pull request title convention used in this repository.
[APPROVALNOTIFIER] This PR is NOT APPROVED
This pull-request has been approved by: Once this PR has been reviewed and has the lgtm label, please ask for approval from humairak. For more information see the Kubernetes Code Review Process.
The full list of commands accepted by this bot can be found here.
Approvers can indicate their approval by writing /approve in a comment
Approvers can cancel approval by writing /approve cancel in a comment
Upgrade Test failures are expected until we add migration logic (to follow this PR). Note also UI changes are not included in this, those too - will follow this pr.
First off, this is amazing! Not sure where you find the time 😂
A couple questions because this overlaps with an area of interest. My understanding is that this PR is reporting / updating the status of tasks (components) directly from the launcher such as here. So to check my understanding, this means that we are moving completely away from the persistence agent, correct? I have been running into issues with the persistence agent at scale & with short lived workflows so I am excited about new approaches.
Secondly, I see the added RPCs to update task state. Are these the counter part to the ones used by the V1 persistence agent to populate tasks table here? If this is the case, should we remove the V2 equivalent which, unless I am mistaken, seems to be currently unused (even before this PR).
Insanely impressive, @HumairAK! I look forward to going through it in-depth.
Please let us know if there any specific areas you want us to sequence first / prioritize with our reviews.
Document migration path for existing MLMD data (if applicable)
^ This will be critical for existing workloads.
@CarterFendley
For your first point, PA is still required to report the overall status of the Run. It monitors the Argo WF resource and we still require this to report on failures not encountered during driver/launcher runs (e.g. pod schedule failures, etc.). So we still require an external monitoring of a run. I will also be moving the update status propagation logic to the api server in this PR after some offline discussions with Matt/Nelesh.
For your second point, the tasks table in v1 is being removed it is only used for caching today and it is not utilized by any other APIs. It is a bit abused and part of an incomplete implementation of a different approach that was intended by previous maintainers. As such this change will be part of the next KFP major version bump (3.0). All the data required for KFP runs in tasks table is persisted in mlmd, and we can use this for migration (namely just cache fingerprints).
Please let us know if there any specific areas you want us to sequence first / prioritize with our reviews.
@droctothorpe as per our discussion today, I would suggest you review the higher level changes first, e.g. Proto files, Gorm Models, Authorization and related changes - consideration for things like migration etc.
Thanks for the response @HumairAK!
PA is still required to report the overall status of the Run... we still require this to report on failures not encountered during driver/launcher runs (e.g. pod schedule failures, etc.)
Interesting, good to know!
the tasks table in v1 is being removed it is only used for caching today and it is not utilized by any other APIs.
The other place I have seen it seen it used previously was in the task_details attributed of the GetRun API return (see here). Looks like this will be replaced in your PR.
I will also be moving the update status propagation logic to the api server in this PR
That part went over my head lol.
So I am mostly concerned with the ability to get run / component information (status / runtime) primarily through the SDK. At the moment this depends on the PA (only partially for V2) and is why I am asking about these components. As mentioned, I have noticed some instability when handling many workflows. Since you expect the PA to exist in V3 too, want to make sure we are able to scale that properly.
Since I do not know the timeline for V3, maybe it is worthwhile implementing something in V2 to help us with this in the meantime. Potentially building in some metrics and suggested scaling behavior of the PA deployment or similar. Any suggestions where I should continue discussion on this? Any existing similar issues / threads you are familiar with?
@CarterFendley
The other place I have seen it seen it used previously was in the task_details attributed of the GetRun API return
It's used to populate the run details field for the runs object, but it's mostly just a copy of the run's associated Argo workflow status field (node statuses). We will likely drop this field next major version upgrade.
Since you expect the PA to exist in V3 too, want to make sure we are able to scale that properly.
Our current intent is to get rid of PA as we see it as an unnecessary overhead for merely run status reporting, either we consolidate this logic into the KFP server or a separate dedicated controller that uses controller runtime, either way we'll certainly keep scalability in mind.
GPT5.1 Codex review:
### Review Findings
1. **Artifact-task records always marked as plain outputs**
`CreateArtifact` and `CreateArtifactsBulk` ignore the `request.type` field and hardcode every `ArtifactTask` as `IOType_OUTPUT`, even when the caller explicitly sets `IOType_ITERATOR_OUTPUT` for loop iterations or other specialized output modes. This drops iterator semantics, so parent DAGs can no longer distinguish per-iteration outputs and downstream resolvers will treat every propagated artifact as a flat output.
```87:95:backend/src/apiserver/server/artifact_server.go
artifactTask := &apiv2beta1.ArtifactTask{
ArtifactId: artifact.UUID,
TaskId: task.UUID,
RunId: request.GetRunId(),
Type: apiv2beta1.IOType_OUTPUT,
Producer: producer,
Key: request.GetProducerKey(),
}
The same hardcoding occurs in the bulk path (artifactReq loop). The server should honor request.GetType() so iterator outputs and ONE_OF outputs survive.
2. Namespace filtering in ListTasks uses only “get” permissions
When tasks are listed by namespace (no run or parent filter) the API checks for Verb: get on the runs resource, not list. That means any subject that can “get” a single run in the namespace can enumerate all tasks in that namespace, even if they were denied list permission. This is a privilege escalation and breaks RBAC expectations—namespace-wide enumeration should require list.
resourceAttributes := &authorizationv1.ResourceAttributes{
Namespace: namespace,
Verb: common.RbacResourceVerbGet,
Group: common.RbacPipelinesGroup,
Version: common.RbacPipelinesVersion,
Resource: common.RbacResourceTypeRuns,
}
err := s.resourceManager.IsAuthorized(ctx, resourceAttributes)
Please change Verb to common.RbacResourceVerbList (or split into a separate authorization path) so listing tasks adheres to Kubernetes RBAC semantics.
3. End users lack RBAC to the new artifact APIs
The aggregated “view/edit” cluster roles that profiles install still grant only pipelines, runs, experiments, etc. They don’t include the new artifacts resource, so any user who only has the standard aggregate-to-kubeflow-pipelines-view role will get 403s when the UI starts calling ListArtifacts/GetArtifact. Only the pipeline-runner SA can hit these endpoints today.
- apiGroups:
- [pipelines.kubeflow.org](http://pipelines.kubeflow.org/)
resources:
- runs
verbs:
- get
- list
- readArtifact
Please extend the aggregated roles (both “view” and “edit” flavors) with resources: artifacts and appropriate verbs (get, list, maybe create where required) so non-admin users retain parity with the old MLMD-backed functionality.
Open Questions / Follow-ups
- Should artifact creation also validate that the artifact namespace matches the associated task/run namespace? Right now the server trusts the client-provided namespace, which could leave artifacts in namespaces where the run owner has no access.
Suggested Next Steps
- Fix the artifact-task type handling and add regression tests for iterator outputs.
- Adjust the namespace authorization in
ListTasks. - Update the RBAC manifests (and any profile-controller templating) so standard users gain the new permissions before the UI is switched over. Let me know if you’d like a focused re-test after these are addressed.
Claude 4.5 review:
PR #12430: MLMD Removal - Action Items
PR: https://github.com/kubeflow/pipelines/pull/12430
Branch: mlmd-removal
Review Date: 2025-11-20
Overall Status: ⚠️ 3 Issues to Address (1 Critical, 2 Important)
🚨 Critical Issues (Must Fix Before Merge)
1. Terminal State Enforcement Missing
Priority: 🔴 CRITICAL
Effort: ~4 hours
Severity: Data integrity issue
Problem
The design requires preventing task updates when the parent run is in a terminal state (SUCCEEDED, FAILED, or CANCELED). This check is not implemented.
Impact
Launchers could update tasks after a run completes, leading to inconsistent state.
Required Fix
File: backend/src/apiserver/server/run_server.go
Location: UpdateTask() function (line ~685)
Add this code before the authorization check:
func (s *RunServer) UpdateTask(ctx context.Context, request *apiv2beta1.UpdateTaskRequest) (*apiv2beta1.PipelineTaskDetail, error) {
taskID := request.GetTaskId()
// Get existing task
existingTask, err := s.resourceManager.GetTask(taskID)
if err != nil {
return nil, util.Wrap(err, "Failed to get existing task for authorization")
}
// ✅ ADD THIS: Check if run is in terminal state
run, err := s.resourceManager.GetRun(existingTask.RunUUID)
if err != nil {
return nil, util.Wrap(err, "Failed to get run to check terminal state")
}
terminalStates := []model.RuntimeState{
model.RuntimeStateSucceeded,
model.RuntimeStateFailed,
model.RuntimeStateCanceled,
}
for _, terminalState := range terminalStates {
if run.State == terminalState {
return nil, util.NewInvalidInputError(
"Cannot update task %s: parent run %s is in terminal state %s",
taskID, existingTask.RunUUID, terminalState,
)
}
}
// Continue with existing authorization and update logic...
}
Also apply to: UpdateTasksBulk() function
Required Test
File: backend/src/apiserver/server/run_server_tasks_test.go
func TestUpdateTask_TerminalState_Rejected(t *testing.T) {
// Setup
clientManager, resourceManager := setupTestEnv()
runSrv := NewRunServer(resourceManager, nil)
// Create run and task
run := createTestRun(t, resourceManager, "test-run")
task := createTestTask(t, runSrv, run.UUID, "test-task")
// Mark run as SUCCEEDED (terminal state)
resourceManager.UpdateRun(run.UUID, &model.Run{State: model.RuntimeStateSucceeded})
// Attempt to update task - should fail
_, err := runSrv.UpdateTask(context.Background(), &apiv2beta1.UpdateTaskRequest{
TaskId: task.GetTaskId(),
Task: &apiv2beta1.PipelineTaskDetail{
TaskId: task.GetTaskId(),
State: apiv2beta1.PipelineTaskDetail_FAILED,
},
})
// Assert: Update should be rejected
assert.Error(t, err)
assert.Contains(t, err.Error(), "terminal state")
}
⚠️ Important Issues (Should Fix Before Merge to Master)
2. Cache Fingerprint Not Cleared on Failure
Priority: 🟡 MEDIUM
Effort: ~2 hours
Severity: Potential false cache hits
Problem
When a task fails, its cache fingerprint is not explicitly cleared. While the cache detection queries for status=SUCCEEDED, it's safer to explicitly clear the fingerprint to prevent any edge cases.
Impact
Low risk of false cache hits if the query logic changes in the future.
Required Fix
File: backend/src/v2/component/launcher_v2.go
Location: Execute() function deferred error handler (line ~200)
Modify the defer block:
func (l *LauncherV2) Execute(ctx context.Context) (executionErr error) {
defer func() {
if executionErr != nil {
l.options.Task.State = apiV2beta1.PipelineTaskDetail_FAILED
l.options.Task.CacheFingerprint = "" // ✅ ADD THIS LINE
l.options.Task.StatusMetadata = &apiV2beta1.PipelineTaskDetail_StatusMetadata{
Message: executionErr.Error(),
}
}
l.options.Task.EndTime = timestamppb.New(time.Now())
l.batchUpdater.QueueTaskUpdate(l.options.Task)
// ... rest of defer logic
}()
// ... rest of Execute function
}
Required Test
File: backend/src/v2/component/launcher_v2_test.go
func TestLauncher_FailedExecution_ClearFingerprint(t *testing.T) {
// Setup launcher with mocked failure
launcher := setupTestLauncher(t)
// Set initial fingerprint
launcher.options.Task.CacheFingerprint = "test-fingerprint-123"
// Execute (will fail due to mock)
err := launcher.Execute(context.Background())
// Assert: Fingerprint should be cleared
assert.Error(t, err)
assert.Equal(t, "", launcher.options.Task.CacheFingerprint)
assert.Equal(t, apiV2beta1.PipelineTaskDetail_FAILED, launcher.options.Task.State)
}
3. Exit Handler Task Type Not Detected
Priority: 🟡 MEDIUM
Effort: ~3 hours
Severity: Tasks misclassified as generic DAG
Problem
The proto defines EXIT_HANDLER task type, but there's no explicit detection logic in the driver. Exit handler tasks will be classified as generic DAG type.
Impact
Exit handler tasks won't have the correct type in the database, making it harder to query or display them correctly.
Required Fix
File: backend/src/v2/driver/dag.go
Location: Type detection switch statement (line ~103)
Add exit handler detection:
// Determine type of DAG task
switch {
case iterationCount != nil:
count := int64(*iterationCount)
taskToCreate.TypeAttributes = &gc.PipelineTaskDetail_TypeAttributes{IterationCount: &count}
taskToCreate.Type = gc.PipelineTaskDetail_LOOP
taskToCreate.DisplayName = "Loop"
execution.IterationCount = util.IntPointer(int(count))
// ✅ ADD THIS CASE
case strings.HasPrefix(opts.TaskName, "exit-handler-"):
taskToCreate.Type = gc.PipelineTaskDetail_EXIT_HANDLER
taskToCreate.DisplayName = "Exit Handler"
case condition != "":
taskToCreate.Type = gc.PipelineTaskDetail_CONDITION_BRANCH
taskToCreate.DisplayName = "Condition Branch"
case strings.HasPrefix(opts.TaskName, "condition") && !strings.HasPrefix(opts.TaskName, "condition-branch"):
taskToCreate.Type = gc.PipelineTaskDetail_CONDITION
taskToCreate.DisplayName = "Condition"
default:
taskToCreate.Type = gc.PipelineTaskDetail_DAG
}
Required Test
File: backend/src/v2/driver/dag_test.go
func TestDAG_ExitHandler_TypeSet(t *testing.T) {
opts := common.Options{
TaskName: "exit-handler-cleanup",
// ... other required fields
}
execution, err := DAG(context.Background(), opts, mockClientManager)
assert.NoError(t, err)
// Verify task was created with EXIT_HANDLER type
task, _ := mockClientManager.KFPAPIClient().GetTask(context.Background(),
&apiv2beta1.GetTaskRequest{TaskId: execution.TaskID})
assert.Equal(t, apiv2beta1.PipelineTaskDetail_EXIT_HANDLER, task.Type)
}
📝 Documentation Updates
Update Design Document
Priority: 🟢 LOW
Effort: ~1 hour
File: proposals/12147-mlmd-removal/design-details.md
Change Required
The design states metrics will be stored in a separate metrics table, but the implementation stores them in the artifacts table with special types.
Update Section: "Metrics" (line ~307)
From:
"we'll instead leverage the Metrics table in KFP which is currently unused"
To:
"Metrics are stored in the artifacts table with dedicated artifact types (
Artifact_Metric,Artifact_ClassificationMetric,Artifact_SlicedClassificationMetric). They are distinguished by having no URI and storing data in thenumber_valueormetadatafields."
📋 Summary Table
| Issue | Priority | Effort | Files to Modify | Tests Required |
|---|---|---|---|---|
| 1. Terminal State | 🔴 Critical | 4h | run_server.go |
run_server_tasks_test.go |
| 2. Cache Fingerprint | 🟡 Medium | 2h | launcher_v2.go |
launcher_v2_test.go |
| 3. Exit Handler | 🟡 Medium | 3h | dag.go |
dag_test.go |
| 4. Documentation | 🟢 Low | 1h | design-details.md |
N/A |
| Total | 10h | 4 files | 3 test files |
✅ Merge Recommendations
For mlmd-removal Branch
Status: ⚠️ Conditional Approval
Requirements:
- ✅ Must fix: Issue 1 (Terminal State Enforcement)
Timeline: 1 day
For master Branch
Status: 🚫 Not Ready
Requirements:
- ✅ Must fix: Issue 1 (Terminal State Enforcement)
- ⚠️ Should fix: Issue 2 (Cache Fingerprint)
- ⚠️ Should fix: Issue 3 (Exit Handler)
- 📝 Should update: Issue 4 (Documentation)
Timeline: 2-3 days
🎯 Next Steps
-
Immediate (Before merging to
mlmd-removal):- [ ] Implement terminal state enforcement
- [ ] Add terminal state tests
- [ ] Test manually with concurrent runs
-
Before merging to
master:- [ ] Clear cache fingerprint on failure
- [ ] Add exit handler detection
- [ ] Update design documentation
- [ ] Run full integration test suite
- [ ] Verify all new tests pass
-
Post-merge (Follow-up PRs as planned):
- Migration implementation
- Frontend changes
📞 Contact
For questions or clarifications about these action items, refer to the detailed review in BACKEND_VERIFICATION_CHECKLIST.md.
Reviewer: AI Assistant
Date: 2025-11-20
Hi @HumairAK , Thank you for the amazing work.
Would it be possible to add a configurable backoff mechanism to all gRPC connections? Currently, the KFP driver fails pipelines when gRPC clients cannot connect to MLMD, especially when the Kubernetes cluster is under heavy network load due to large pipelines running simultaneously.
Since gRPC provides an experimental backoff mechanism, it would be valuable to expose this as a configurable option for all new clients.
We can add the following option to each new client:
grpc.WithConnectParams(backoffConfig)
The backoffConfig can either be provided through the endpoint configuration or defined similarly to how we currently handle the TLS configuration.
https://pkg.go.dev/google.golang.org/grpc#WithConnectParams https://github.com/grpc/grpc/blob/master/doc/connection-backoff.md