conductor icon indicating copy to clipboard operation
conductor copied to clipboard

Why execute so slowly ?

Open Sunyelw opened this issue 1 year ago • 12 comments

Conductor Version: #v3.10.3 Docker-compose deploy, and demo is starting local.

docker-compose -f docker-compose.yaml up

invoke trace image

a simple demo cost 200s image

Why?

Sunyelw avatar Aug 09 '22 05:08 Sunyelw

Adding to this, when I change version to v3.11.0

Repeat this demo, got this image

Sunyelw avatar Aug 09 '22 11:08 Sunyelw

@zz-netflix @mattl-netflix @netflixgithubreadonly I have the same problem, please help

dangjianguo123 avatar Aug 10 '22 02:08 dangjianguo123

Hello, have you encountered the problem that the concurrency cannot go up after the cluster?

dangjianguo123 avatar Aug 10 '22 02:08 dangjianguo123

Hi @dangjianguo123 - What version of Conductor are you using?

@Sunyelw - Can you share the workflow definition? You can create it here and share - https://play.orkes.io (This is a conductor server playground / sandbox)

boney9 avatar Aug 10 '22 04:08 boney9

@boney9 Hi,conductor version number is 3.9.9-rc.1

dangjianguo123 avatar Aug 10 '22 05:08 dangjianguo123

Can you add your workflow definition to https://play.orkes.io/ ? We can evaluate your situation in that server. Alternatively you can share your workflow definition here and what level of concurrency are you expecting. Thanks.

boney9 avatar Aug 10 '22 05:08 boney9

@boney9 thx for your relpy. But, Emmm, my task is Simple type which need develop worker, it required deploy account and transaction service to execute.

So it couldn't execute on https://play.orkes.io/ after create workflow only.

Fix my conductor version is v3.10.3.

Below is my all code, please.

Workflow definition

{
  "name": "MyOrch",
  "description": "To Service Orchestrations",
  "version": 1,
  "schemaVersion": 2,
  "tasks": [
    {
      "name": "transaction",
      "taskReferenceName": "node1",
      "inputParameters": {
        "id": "${workflow.input.id}",
        "sender": "${workflow.input.sender}",
        "receiver": "${workflow.input.receiver}",
        "money": "${workflow.input.money}"
      },
      "type": "SIMPLE",
      "startDelay": 0
    },
    {
      "name": "fork_account",
      "type": "FORK_JOIN",
      "taskReferenceName": "fork_acc",
      "forkTasks": [
      	[
      		{
            "name": "account_sender", 
            "taskReferenceName": "acc_sender",
            "inputParameters": {
              "message": "${node1.output.result}",
              "name": "${workflow.input.sender}",
              "money": "${workflow.input.money}"
            },
            "type": "SIMPLE",
            "optional": true
           }
      	],
      	[
      		{
            "name": "account_receiver", 
            "taskReferenceName": "acc_receiver",
            "inputParameters": {
              "message": "${node1.output.result}",
              "name": "${workflow.input.receiver}",
              "money": "${workflow.input.money}"
            },
            "type": "SIMPLE",
            "optional": true
           }
      	]
      ]
    },
    {
      "name": "account_join",
      "taskReferenceName": "acc_join",
      "type": "JOIN",
      "joinOn": [
        "acc_sender",
        "acc_receiver"
      ]
    }
  ],
  "outputParameters": {
    "tran.result": "${node1.output.result}",
    "acc.result": "${acc_join.output}"
  },
  "failureWorkflow": "failure_flow",
  "ownerEmail": "[email protected]"
}

Task definition

[
    {
        "name": "account_receiver",
        "retryCount": 3,
        "timeoutSeconds": 300,
        "inputKeys": [
            "name",
            "money"
        ],
        "outputKeys": [
            "accountResult"
        ],
        "timeoutPolicy": "RETRY",
        "retryLogic": "FIXED",
        "retryDelaySeconds": 10,
        "responseTimeoutSeconds": 300,
        "ownerEmail": "[email protected]"
    },
    {
        "name": "account_sender",
        "retryCount": 3,
        "timeoutSeconds": 300,
        "inputKeys": [
            "name",
            "money"
        ],
        "outputKeys": [
            "result"
        ],
        "timeoutPolicy": "RETRY",
        "retryLogic": "FIXED",
        "retryDelaySeconds": 10,
        "responseTimeoutSeconds": 300,
        "ownerEmail": "[email protected]"
    },
    {
        "name": "transaction",
        "retryCount": 3,
        "timeoutSeconds": 300,
        "inputKeys": [
            "sender",
            "receiver",
            "money"
        ],
        "outputKeys": [
            "result"
        ],
        "timeoutPolicy": "RETRY",
        "retryLogic": "FIXED",
        "retryDelaySeconds": 10,
        "responseTimeoutSeconds": 300,
        "ownerEmail": "[email protected]"
    }
]

AccountWorker

public class AccountWorker implements Worker {

    private final static Logger logger = LoggerFactory.getLogger(AccountWorker.class);

    @Value("${conductor.account.server}")
    private String accountServer = "http://localhost:28082";

    private final String taskName;

    public AccountWorker(String taskName){
        this.taskName = taskName;
    }

    public String getActualUrl () {
        throw new UnsupportedOperationException("no url definition.");
    }

    @Override
    public String getTaskDefName(){
        return taskName;
    }

    /**
     * 对账户进行资金操作, 若不存在直接创建账户
     */
    @Override
    public TaskResult execute(Task task){

        final TaskResult taskResult = new TaskResult();

        final Map <String, Object> inputData = task.getInputData();
        final String name = (String) inputData.get("name");
        final String message = (String) inputData.get("message");
        final Object obj = inputData.get("money");

        final int money;
        if (obj instanceof Integer) {
            money = (int) obj;
        } else {
            money = Integer.parseInt((String) obj);
        }

        logger.info("[ACCOUNT] accountServer:{}, message:{}, name:{}, money:{}", accountServer, message, name, money);

        final String invoke = HttpUtil.invoke(accountServer + getActualUrl(), HttpMethod.POST, new Person(message, name, money));

        taskResult.addOutputData("result", invoke);
        taskResult.setStatus(TaskResult.Status.COMPLETED);

        logger.info("[ACCOUNT] END");

        return taskResult;
    }
}

TransactionWorker

public class TransactionWorker implements Worker {

    private final static Logger logger = LoggerFactory.getLogger(TransactionWorker.class);

    @Value("${conductor.transaction.server}")
    private String transactionServer = "http://localhost:28081";

    private final String taskName;

    public TransactionWorker(String taskName){
        this.taskName = taskName;
    }

    @Override
    public String getTaskDefName(){
        return taskName;
    }

    @Override
    public TaskResult execute(Task task){

        final TaskResult taskResult = new TaskResult();

        final Map <String, Object> inputData = task.getInputData();
        final Object idObj = inputData.getOrDefault("id", 0);
        final int id = null == idObj ? 0 : (int) idObj;
        final String sender = (String) inputData.get("sender");
        final String receiver = (String) inputData.get("receiver");
        final int money = (int) inputData.get("money");

        logger.info("transactionServer: {}", transactionServer);
        logger.error("[TRANSACTION] id:{}, sender:{}, receiver:{}, money:{}", id, sender, receiver, money);

        final String invoke = HttpUtil.invoke(transactionServer + "/tran", HttpMethod.POST, new Transaction(id, sender, receiver, money));

        taskResult.addOutputData("result", invoke);
        taskResult.setStatus(TaskResult.Status.COMPLETED);

        logger.info("[TRANSACTION] END");

        return taskResult;
    }
}

TaskRunner

@Component
public class MainRunner {

    private final static Logger logger = LoggerFactory.getLogger(MainRunner.class);

    @Value("${conductor.http.server}")
    private String conductorServerUrl;

    @PostConstruct
    public void initTask() {

        logger.info("[Finance_Task_Init]: {}", conductorServerUrl);

        TaskClient taskClient = new TaskClient();
        taskClient.setRootURI(conductorServerUrl + "/api/");

        int threadCount = 10;
        Worker sender = new SenderAccountWorker("account_sender");
        Worker receiver = new ReceiverAccountWorker("account_receiver");
        Worker tranWorker = new TransactionWorker("transaction");
        final ArrayList <Worker> list = new ArrayList <>();
        list.add(receiver);
        list.add(sender);
        list.add(tranWorker);
        TaskRunnerConfigurer.Builder builder = new TaskRunnerConfigurer.Builder(taskClient, list);
        TaskRunnerConfigurer coordinator = builder.withThreadCount(threadCount).build();

        //Start for polling and execution of the tasks
        coordinator.init();
    }
}

Hope some reply, thx.

Sunyelw avatar Aug 10 '22 07:08 Sunyelw

Hello, have you encountered the problem that the concurrency cannot go up after the cluster?

Sry, not yet.

Sunyelw avatar Aug 10 '22 07:08 Sunyelw

Thanks for sharing the details, let me try and see if I can simulate this in our sandbox.

boney9 avatar Aug 10 '22 16:08 boney9

@Sunyelw Take a look at the client side metrics to see how long it takes for your worker to process the task: https://github.com/Netflix/conductor/blob/478ea92b169ddc688ad58d8693dc1837d7e8320f/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java#L248 Or you can simply just measure the time in your worker for those calls HttpUtil.invoke(xxx) as I guess that is why the task takes long to execute

jxu-nflx avatar Aug 15 '22 22:08 jxu-nflx

@jxu-nflx Sorry, I don't quite understand what you mean, could it be more detailed and executable ?

Sunyelw avatar Aug 18 '22 06:08 Sunyelw

@Sunyelw Are you going to use it in a real-time scene? I am also researching the conductor, add my WeChat account: "lianjianghe", let's communicate together.

lianjunwei avatar Aug 27 '22 04:08 lianjunwei

This issue is stale, because it has been open for 45 days with no activity. Remove the stale label or comment, or this will be closed in 7 days.

github-actions[bot] avatar Oct 12 '22 00:10 github-actions[bot]

This issue was closed, because it has been stalled for 7 days with no activity.

github-actions[bot] avatar Oct 20 '22 00:10 github-actions[bot]