conductor
conductor copied to clipboard
Why execute so slowly ?
Conductor Version: #v3.10.3 Docker-compose deploy, and demo is starting local.
docker-compose -f docker-compose.yaml up
invoke trace
a simple demo cost 200s
Why?
Adding to this, when I change version to v3.11.0
Repeat this demo, got this
@zz-netflix @mattl-netflix @netflixgithubreadonly I have the same problem, please help
Hello, have you encountered the problem that the concurrency cannot go up after the cluster?
Hi @dangjianguo123 - What version of Conductor are you using?
@Sunyelw - Can you share the workflow definition? You can create it here and share - https://play.orkes.io (This is a conductor server playground / sandbox)
@boney9 Hi,conductor version number is 3.9.9-rc.1
Can you add your workflow definition to https://play.orkes.io/ ? We can evaluate your situation in that server. Alternatively you can share your workflow definition here and what level of concurrency are you expecting. Thanks.
@boney9 thx for your relpy.
But, Emmm, my task is Simple
type which need develop worker, it required deploy account
and transaction
service to execute.
So it couldn't execute on https://play.orkes.io/ after create workflow only.
Fix my conductor version is v3.10.3.
Below is my all code, please.
Workflow definition
{
"name": "MyOrch",
"description": "To Service Orchestrations",
"version": 1,
"schemaVersion": 2,
"tasks": [
{
"name": "transaction",
"taskReferenceName": "node1",
"inputParameters": {
"id": "${workflow.input.id}",
"sender": "${workflow.input.sender}",
"receiver": "${workflow.input.receiver}",
"money": "${workflow.input.money}"
},
"type": "SIMPLE",
"startDelay": 0
},
{
"name": "fork_account",
"type": "FORK_JOIN",
"taskReferenceName": "fork_acc",
"forkTasks": [
[
{
"name": "account_sender",
"taskReferenceName": "acc_sender",
"inputParameters": {
"message": "${node1.output.result}",
"name": "${workflow.input.sender}",
"money": "${workflow.input.money}"
},
"type": "SIMPLE",
"optional": true
}
],
[
{
"name": "account_receiver",
"taskReferenceName": "acc_receiver",
"inputParameters": {
"message": "${node1.output.result}",
"name": "${workflow.input.receiver}",
"money": "${workflow.input.money}"
},
"type": "SIMPLE",
"optional": true
}
]
]
},
{
"name": "account_join",
"taskReferenceName": "acc_join",
"type": "JOIN",
"joinOn": [
"acc_sender",
"acc_receiver"
]
}
],
"outputParameters": {
"tran.result": "${node1.output.result}",
"acc.result": "${acc_join.output}"
},
"failureWorkflow": "failure_flow",
"ownerEmail": "[email protected]"
}
Task definition
[
{
"name": "account_receiver",
"retryCount": 3,
"timeoutSeconds": 300,
"inputKeys": [
"name",
"money"
],
"outputKeys": [
"accountResult"
],
"timeoutPolicy": "RETRY",
"retryLogic": "FIXED",
"retryDelaySeconds": 10,
"responseTimeoutSeconds": 300,
"ownerEmail": "[email protected]"
},
{
"name": "account_sender",
"retryCount": 3,
"timeoutSeconds": 300,
"inputKeys": [
"name",
"money"
],
"outputKeys": [
"result"
],
"timeoutPolicy": "RETRY",
"retryLogic": "FIXED",
"retryDelaySeconds": 10,
"responseTimeoutSeconds": 300,
"ownerEmail": "[email protected]"
},
{
"name": "transaction",
"retryCount": 3,
"timeoutSeconds": 300,
"inputKeys": [
"sender",
"receiver",
"money"
],
"outputKeys": [
"result"
],
"timeoutPolicy": "RETRY",
"retryLogic": "FIXED",
"retryDelaySeconds": 10,
"responseTimeoutSeconds": 300,
"ownerEmail": "[email protected]"
}
]
AccountWorker
public class AccountWorker implements Worker {
private final static Logger logger = LoggerFactory.getLogger(AccountWorker.class);
@Value("${conductor.account.server}")
private String accountServer = "http://localhost:28082";
private final String taskName;
public AccountWorker(String taskName){
this.taskName = taskName;
}
public String getActualUrl () {
throw new UnsupportedOperationException("no url definition.");
}
@Override
public String getTaskDefName(){
return taskName;
}
/**
* 对账户进行资金操作, 若不存在直接创建账户
*/
@Override
public TaskResult execute(Task task){
final TaskResult taskResult = new TaskResult();
final Map <String, Object> inputData = task.getInputData();
final String name = (String) inputData.get("name");
final String message = (String) inputData.get("message");
final Object obj = inputData.get("money");
final int money;
if (obj instanceof Integer) {
money = (int) obj;
} else {
money = Integer.parseInt((String) obj);
}
logger.info("[ACCOUNT] accountServer:{}, message:{}, name:{}, money:{}", accountServer, message, name, money);
final String invoke = HttpUtil.invoke(accountServer + getActualUrl(), HttpMethod.POST, new Person(message, name, money));
taskResult.addOutputData("result", invoke);
taskResult.setStatus(TaskResult.Status.COMPLETED);
logger.info("[ACCOUNT] END");
return taskResult;
}
}
TransactionWorker
public class TransactionWorker implements Worker {
private final static Logger logger = LoggerFactory.getLogger(TransactionWorker.class);
@Value("${conductor.transaction.server}")
private String transactionServer = "http://localhost:28081";
private final String taskName;
public TransactionWorker(String taskName){
this.taskName = taskName;
}
@Override
public String getTaskDefName(){
return taskName;
}
@Override
public TaskResult execute(Task task){
final TaskResult taskResult = new TaskResult();
final Map <String, Object> inputData = task.getInputData();
final Object idObj = inputData.getOrDefault("id", 0);
final int id = null == idObj ? 0 : (int) idObj;
final String sender = (String) inputData.get("sender");
final String receiver = (String) inputData.get("receiver");
final int money = (int) inputData.get("money");
logger.info("transactionServer: {}", transactionServer);
logger.error("[TRANSACTION] id:{}, sender:{}, receiver:{}, money:{}", id, sender, receiver, money);
final String invoke = HttpUtil.invoke(transactionServer + "/tran", HttpMethod.POST, new Transaction(id, sender, receiver, money));
taskResult.addOutputData("result", invoke);
taskResult.setStatus(TaskResult.Status.COMPLETED);
logger.info("[TRANSACTION] END");
return taskResult;
}
}
TaskRunner
@Component
public class MainRunner {
private final static Logger logger = LoggerFactory.getLogger(MainRunner.class);
@Value("${conductor.http.server}")
private String conductorServerUrl;
@PostConstruct
public void initTask() {
logger.info("[Finance_Task_Init]: {}", conductorServerUrl);
TaskClient taskClient = new TaskClient();
taskClient.setRootURI(conductorServerUrl + "/api/");
int threadCount = 10;
Worker sender = new SenderAccountWorker("account_sender");
Worker receiver = new ReceiverAccountWorker("account_receiver");
Worker tranWorker = new TransactionWorker("transaction");
final ArrayList <Worker> list = new ArrayList <>();
list.add(receiver);
list.add(sender);
list.add(tranWorker);
TaskRunnerConfigurer.Builder builder = new TaskRunnerConfigurer.Builder(taskClient, list);
TaskRunnerConfigurer coordinator = builder.withThreadCount(threadCount).build();
//Start for polling and execution of the tasks
coordinator.init();
}
}
Hope some reply, thx.
Hello, have you encountered the problem that the concurrency cannot go up after the cluster?
Sry, not yet.
Thanks for sharing the details, let me try and see if I can simulate this in our sandbox.
@Sunyelw Take a look at the client side metrics to see how long it takes for your worker to process the task: https://github.com/Netflix/conductor/blob/478ea92b169ddc688ad58d8693dc1837d7e8320f/client/src/main/java/com/netflix/conductor/client/automator/TaskPollExecutor.java#L248
Or you can simply just measure the time in your worker for those calls HttpUtil.invoke(xxx)
as I guess that is why the task takes long to execute
@jxu-nflx Sorry, I don't quite understand what you mean, could it be more detailed and executable ?
@Sunyelw Are you going to use it in a real-time scene? I am also researching the conductor, add my WeChat account: "lianjianghe", let's communicate together.
This issue is stale, because it has been open for 45 days with no activity. Remove the stale label or comment, or this will be closed in 7 days.
This issue was closed, because it has been stalled for 7 days with no activity.