dolphinscheduler icon indicating copy to clipboard operation
dolphinscheduler copied to clipboard

[DSIP-96][Worker] Unified Task Unique Identifier Design

Open det101 opened this issue 1 month ago • 3 comments

Search before asking

  • [x] I had searched in the DSIP and found no similar DSIP.

Motivation

Background:

In DolphinScheduler, when tasks are submitted to external systems (such as Yarn, K8s), a unique identifier is required. Currently, there are several issues:

  1. Insufficient uniqueness: User-defined appName may be duplicated and cannot guarantee uniqueness.
  2. Fault tolerance difficulty: When Worker fails, it is impossible to accurately judge whether the task is still running outside based only on the identifier.
  3. Non-unified implementation: Each task type (Spark, MapReduce, Flink, K8s) uses a different logic for unique id generation.

Goals:

  • Guarantee uniqueness: Unique identifier generation should be based on taskInstanceId (globally unique).
  • Preserve user intent: User-set appName/task name should be part of the identifier for usability/readability.
  • Unified specification: All task types should follow the same identifier generation rule.
  • Fault tolerance friendly: The format should help future features such as querying & handling running tasks for failover.

Principles:

  • Uniqueness comes first even if readability is slightly affected.
  • Backward compatibility: Should not break existing execution flow or user parameters.
  • Extensible: Adapt different naming requirements from target systems.

Design Detail

Unique Identifier Generation Rules

1. Yarn Tasks (Spark, MapReduce, Flink)

Rule:

  • If user sets appName: {userAppName}-{taskInstanceId}
  • Otherwise: ds-{taskInstanceId} Examples:
  • appName="my-spark-app", taskInstanceId=100 → my-spark-app-100
  • (no appName), taskInstanceId=200 → ds-200

2. K8s Tasks

Rule:

  • Format: {taskName}-{taskInstanceId}
  • taskName must be normalized (lowercase, remove special characters) Examples:
  • taskName="MyTask", taskInstanceId=100 → mytask-100

3. Other Task Types

Rule:

  • Format: ds-task-{taskInstanceId} Example:
  • taskInstanceId=100 → ds-task-100

Features

  • Global Uniqueness: Guaranteed by using taskInstanceId.
  • Traceability: Easy to correlate with database and UI.
  • Readability: Contains user’s input for easier search/identification.
  • Friendly for future fault tolerance: Easy to query task and status by unified format.

Implementation Plan

Phase 1: Utility Classes

  • Add TaskUniqueIdentifierGenerator utility (Java)
  • Add TaskUniqueIdentifierExtractor utility for parsing/extraction
  • Write unit tests

Phase 2: Spark Integration

  • Modify SparkTask.populateSparkOptions() to use the new generator
  • Verify via tests

Phase 3: MapReduce Integration

  • Modify MapReduceArgsUtils.buildArgs() to use the new generator
  • Verify via tests

Phase 4: Flink Integration

  • Modify FlinkArgsUtils.buildRunCommandLine() and FlinkArgsUtils.buildInitOptionsForSql()
  • Verify via tests

Phase 5: K8s Task Optimization

  • Modify K8sTaskExecutor.buildK8sJob()
  • Ensure normalized format
  • Verify via tests

Phase 6: Docs & Integration Tests

  • End-to-end & performance testing
  • Documentation improvement

Recommended code interfaces:

public interface TaskUniqueIdentifierGenerator {
    String generateYarnId(String userAppName, long taskInstanceId);
    String generateK8sName(String taskName, long taskInstanceId, int maxLength);
    String generateDefaultId(long taskInstanceId);
}
public interface TaskUniqueIdentifierExtractor {
  public static Optional<Long> extractTaskInstanceIdFromYarnAppName(String appName);
  public static Optional<Long> extractTaskInstanceIdFromK8sName(String k8sName);
}

Compatibility, Deprecation, and Migration Plan

Backward compatibility:

  • Existing parameters and execution flows are not affected.
  • User customized appName/task name is still included in identifier format.

Migration:

  • Can be rolled out by task type or feature flag for stepwise adoption.
  • No need for migration of historical tasks/data.

Deprecation:

  • No existing feature deprecated.
  • All new identifier formats are backward-compatible and fall back to old logic if new fails.

Test Plan

  • Unit tests for utility classes covering edge cases and normalization.
  • Integration tests for each task type (Spark, Flink, MapReduce, K8s) verifying task names generated and used in external system submission.
  • End-to-end validation: submit sample workflows and check that generated identifiers appear in Yarn/K8s UI/API.
  • Performance tests to ensure no extra latency.
  • Docs update with usage, formats, and troubleshooting.

Code of Conduct

det101 avatar Nov 28 '25 03:11 det101

+1, it's better to have a way to bind the dolphin task with remote task before submit to remote.

ruanwenjun avatar Nov 28 '25 04:11 ruanwenjun

Thanks for bringing this proposal. Before PR, there are some details needs to be discuss.

  1. How to verify the task type is Yarn or K8S if the task run through a shell or python or java?
  2. Yarn task stores applicationIds in DS, and a DS task may correspond to multiple application IDs. How to correspond to each other one by one in this case?
  3. Yarn performs the operation of task kill through applicationId. If we only save the appName set by DS, how can we ensure that Yarn kill is normal? @det101

SbloodyS avatar Nov 28 '25 06:11 SbloodyS

Thanks for bringing this proposal. Before PR, there are some details needs to be discuss.

  1. How to verify the task type is Yarn or K8S if the task run through a shell or python or java?
  2. Yarn task stores applicationIds in DS, and a DS task may correspond to multiple application IDs. How to correspond to each other one by one in this case?
  3. Yarn performs the operation of task kill through applicationId. If we only save the appName set by DS, how can we ensure that Yarn kill is normal? @det101

These are all great questions, This is what I think about 1.For Shell/Python tasks we can parse the command line (e.g., detect spark-submit --master yarn or k8s://) to infer the target platform. Java tasks are much more of a black box—if the code doesn’t log where it’s submitting, we simply can’t tell. I’d rather not support auto-detection there; users should handle shutdown via the hooks in their own code. 2.To my understanding, a single submission normally generates only one Yarn application entry point, and the multiple "jobs" you mentioned are typically internal tasks rather than independent application IDs. That said, I'm still not quite clear on what specific scenarios would result in multiple appIDs. Would you mind sharing an example? 3.On Yarn, the workflow is: use the REST API to filter by appName, find the matching applicationId, then kill that applicationId. @SbloodyS

det101 avatar Nov 28 '25 07:11 det101