dolphinscheduler icon indicating copy to clipboard operation
dolphinscheduler copied to clipboard

[DSIP-11][Feature][python] Use the pydolphinscheduler to dynamically generate workflows from the configuration file

Open jieguangzhou opened this issue 3 years ago • 11 comments

Search before asking

  • [X] I had searched in the issues and found no similar feature requirement.

Description

Dynamically generate workflows from YAML configuration files, aka workflows-as-code.

Now we can use python-dolphinscheduler to build workflow easily. Workflows-as-code would be easier to use if we supported configuration file definition workflows. It will make the workflow clearer, shareable, and reviewable.

Then we can upload our project as a git repository. This can help us do richer operations, such as CICD, etc

We can create a workload by defining the following fields, all the field definitions are derived from PydolphinScheduler.

A YAML file defines a workflow :

# Define the process
Process:
  #process(workflow) name
  name: prepare_datas

  # Parameters for process creation
  param:
      project: /data/project

  # Whether to run the workflow after the creation is complete
  run: True

# Define the tasks under the process
Tasks:
  -
    # task type: Shell, python, SubProcess, Spark, etc
    TaskType: Shell

    # Upstream Task List
    dependencies: [xxxx]

    # Parameters for task creation
    params:
      name: task1
      other parameters: ...

  -  
    TaskType: Python
    params:
      name: task2
  - ...
  - ... 

Here is a simple example to show how to use YAML to manage workflows(A YAML file defines a workflow):

# Define the process
Process:
  name: prepare_datas
  param:
      project: /data/project

# Define the tasks under the process
Tasks:
  - 
    TaskType: Shell
    params:
      name: download_data
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data.download ${data_path}
      local_params: 
        - { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}

  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_signals 
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data_processing.calc_signals \
                      --data_path ${data_path} \
                      --name_file ${project}/feature_signal.txt 
      local_params: 
        - { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}


  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_features 
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data_processing.calc_features \
                    --data_path $data_path \
                    --name_file ${project}/feature_signal.txt
      local_params:
        - { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}

Alternatively, we can use some of the methods native to YAML files for easier definition,for example, using & and * :

# User-defined parameters. The parameter suggestions are all written here
Params:
  process_name: &process_name prepare_datas
  project: &project "/data/project"

# The variable definitions in the YAML file are used for the following configuration
Varible:
  local_params: &local_params { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}

# Define the process
Process:
  name: *process_name
  param:
      project: *project

# Define the tasks under the process
Tasks:
  - 
    TaskType: Shell
    params:
      name: download_data
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data.download ${data_path}
      local_params: 
        - *local_params 

  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_signals 
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data_processing.calc_signals \
                      --data_path ${data_path} \
                      --name_file ${project}/feature_signal.txt 
      local_params: 
        - *local_params 


  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_features 
      command: |
                export PYTHONPATH=${project}
                source ${project}/env/bin/activate
                data_path=${project}/data/daily
                python -m dmsa.data_processing.calc_features \
                    --data_path $data_path \
                    --name_file ${project}/feature_signal.txt
      local_params:
        - *local_params 

A richer approach is to combine the DS features, and we can add some magic methods to make it easier to use. For example, we can read environment variables $Env{xxxx}, and we can read the contents of files $File{xxxx}:

# User-defined parameters. The parameter suggestions are all written here
Params:
  process_name: &process_name prepare_datas
  project: &project $Env{STOCK_PROJECT}

# The variable definitions in the YAML file are used for the following configuration
Varible:
  local_params: &local_params { "prop": "project", "direct": "IN", "type": "VARCHAR", "value": "${project}"}

# Define the process
Process:
  name: *process_name
  param:
      project: *project

# Define the tasks under the process
Tasks:
  - 
    TaskType: Shell
    params:
      name: download_data
      command: $File("download_data.sh")
      local_params: 
        - *local_params 

  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_signals 
      command: $File("calc_signals")
      local_params: 
        - *local_params 


  - 
    TaskType: Shell
    dependencies: [download_data]
    params:
      name: calc_features 
      command: $File("calc_features")
      local_params:
        - *local_params 

Once we have defined the configuration file, we can use the CLI of the PydolphinScheduler to load the workflow

Use case

No response

Related issues

No response

Are you willing to submit a PR?

  • [X] Yes I am willing to submit a PR!

Code of Conduct

jieguangzhou avatar Jul 15 '22 07:07 jieguangzhou

Thank you for your feedback, we have received your issue, Please wait patiently for a reply.

  • In order for us to understand your request as soon as possible, please provide detailed information、version or pictures.
  • If you haven't received a reply for a long time, you can join our slack and send your question to channel #troubleshooting

github-actions[bot] avatar Jul 15 '22 08:07 github-actions[bot]

@caishunfeng @ruanwenjun @EricGao888 @zhongjiajie PTAL.

SbloodyS avatar Jul 15 '22 10:07 SbloodyS

Hi @jieguangzhou, this looks good to me. For some users who are not willing to spend some time picking up python but do have tons of workflows to generate, yaml is a better approach than dragging and connecting tasks through the UI. Just curious, why do you want this feature? Are you a yaml lover? 😄

EricGao888 avatar Jul 15 '22 12:07 EricGao888

Hi @jieguangzhou, this looks good to me. For some users who are not willing to spend some time picking up python but do have tons of workflows to generate, yaml is a better approach than dragging and connecting tasks through the UI. Just curious, why do you want this feature? Are you a yaml lover? 😄

If we can use YAML to manage workflows, it might make it easier for engineers to manage workflows and do version management. Git can be used to manage and share workflows. In addition, after communicating with some overseas engineers, I found that some engineers prefer to use YAML to manage programs, which I think is also suitable for me. I think yaml is better than TOML or JSON for this scenario.

jieguangzhou avatar Jul 15 '22 14:07 jieguangzhou

Good idea, it's better to add some design details.

caishunfeng avatar Jul 19 '22 09:07 caishunfeng

It's OK for me to define a DSL to create a workflow, some geeks may like this way, but this can not work well in big workflow.

ruanwenjun avatar Jul 19 '22 10:07 ruanwenjun

Can we parse the data directly and store it in the database? or do we have to do it in other ways?

jieguangzhou avatar Jul 19 '22 13:07 jieguangzhou

Good idea, it's better to add some design details.

Ok, I will add details here in the future

jieguangzhou avatar Jul 19 '22 13:07 jieguangzhou

It's OK for me to define a DSL to create a workflow, some geeks may like this way, but this can not work well in big workflow.

It's OK for me to define a DSL to create a workflow, some geeks may like this way, but this can not work well in big workflow.

Thank you for your suggestions. I will think again about how to handle the large workflow scenario, and if not, let the user use the original way to support the large workflow for the time being

jieguangzhou avatar Jul 19 '22 13:07 jieguangzhou

Hi @jieguangzhou I add DSIP to this issue, because it add new mechanism to Python API, please follow the https://dolphinscheduler.apache.org/en-us/community/DSIP.html to create a new DSIP

zhongjiajie avatar Jul 26 '22 11:07 zhongjiajie

BTW, I think it is a good adding for python api

zhongjiajie avatar Jul 26 '22 11:07 zhongjiajie