conductor icon indicating copy to clipboard operation
conductor copied to clipboard

How to use a dynamic subworkflow?

Open smurakozi opened this issue 5 years ago • 4 comments

According to the doc of SubWorkflow the workflow parameter "Allows starting a subworkflow with a dynamic workflow definition.". This is the solution that andyyumiao recommended to make dynamic fork serializer.

I see how you can inline a workflow def but I have no clue how to make it dynamic. For me dynamic means you should be able to define the subworkflow "on the fly", when the parent workflow/the subworkflow task is running.

I tried to use the output of a previous task, but when I try to create the main workflow (PUT metadata/workflow) it's rejected with an error INTERNAL_ERROR - Cannot construct instance of com.netflix.conductor.common.metadata.workflow.WorkflowDef (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value. I tried to find related doc pages (e.g. in kitchensink) issues, stackoverflow questions..., but I couldn't find any examples. I also checked the code of Conductor but found no hint how to do this.

Are there any examples how to use a dynamic subworkflow? Am I misunderstanding what the doc means on a dynamic workflow?

Background: I need to process a long list of items in batches. My idea was to create a task for each batch, and to create these tasks as a dynamic sub workflow. I also considered using a dynamic forkjoin, but I have to process the batches in a sequential fashion. I also checked the do-while task, but it would restart all batches in case of an error + Do while task does NOT support domain or isolation group execution (that we plan to use).

Is there another alternative I did not consider?

These are the tasks in my latest attempt to create the sub workflow dynamically:

  {
    "name": "create_batches",
    "taskReferenceName": "create_batches",
    "inputParameters": {
    },
    "type": "SIMPLE"
  },
  {
    "name": "sub_workflow_task",
    "taskReferenceName": "sub_workflow_task",
    "type": "SUB_WORKFLOW",
    "inputParameters": {
      "workflowDefinition": "${create_batches.output}"
    },
    "subWorkflowParam": {
      "name": "sub_workflow",
      "version": 1,
      "taskToDomain": {
        "*": "mydomain"
      },
      "workflowDefinition": "workflowDefinition"
    }
  }

The output of the first task is a workflow definition with name sub_workflow. I also tried to use this output directly in subWorkflowParam.workflowDefinition, with the same result.

Thanks, Sandor

smurakozi avatar Nov 24 '20 10:11 smurakozi

SubWorkflows can be "dynamically" created as you expect. The workflow def in your example wasn't using the interpolation correctly. The following should work,

{
    "name": "create_batches",
    "taskReferenceName": "create_batches",
    "inputParameters": {
    },
    "type": "SIMPLE"
  },
  {
    "name": "sub_workflow_task",
    "taskReferenceName": "sub_workflow_task",
    "type": "SUB_WORKFLOW",
    "inputParameters": {
    },
    "subWorkflowParam": {
      "name": "sub_workflow",
      "version": 1,
      "taskToDomain": {
        "*": "mydomain"
      },
      "workflowDefinition": "${create_batches.output}"
    }
  }

aravindanr avatar Dec 01 '20 00:12 aravindanr

@aravindanr thanks for your answer.

I tried to create a workflow with the tasks you provided, but I get the same error message: "INTERNAL_ERROR - Cannot construct instance of com.netflix.conductor.common.metadata.workflow.WorkflowDef (although at least one Creator exists): no String-argument constructor/factory method to deserialize from String value ('${create_batches.output}')\n at [Source: (org.eclipse.jetty.server.HttpInputOverHTTP); line: 27, column: 29] (through reference chain: com.netflix.conductor.common.metadata.workflow.WorkflowDef[\"tasks\"]->java.util.ArrayList[1]->com.netflix.conductor.common.metadata.workflow.WorkflowTask[\"subWorkflowParam\"]->com.netflix.conductor.common.metadata.workflow.SubWorkflowParams[\"workflowDefinition\"])"

It fails when I post the workflow def to metadata/workflow. Do you have any hints how to proceed with this?

Thanks again, Sandor

smurakozi avatar Jan 05 '21 08:01 smurakozi

@smurakozi Thanks for reporting this, we will try to reproduce this issue and get back to you with a fix soon. In the meantime, if you get a chance to debug/fix this, we would welcome a pull request :). Thanks

apanicker-nflx avatar Jan 13 '21 00:01 apanicker-nflx

This issue is stale, because it has been open for 45 days with no activity. Remove the stale label or comment, or this will be closed in 7 days.

github-actions[bot] avatar Apr 07 '21 02:04 github-actions[bot]