goflow
goflow copied to clipboard
Pause, Resume and Stop don't work fine
Expect:
Pause
can pause the specified workflow, and Resume
can resume it.
Stop
can stop active workflow and set its status to Finished
.
Actually:
- After running
Pasue
, the workflow remains executed until the end.
...
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] intermediate result from node 0_1_Node-1 to 0_2_Node-2 stored as 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] performing request for Node 0_2_Node-2, indegree count is 1
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] request submitted for Node 0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] intermediate result from Node 0_1_Node-1 to Node 0_2_Node-2 retrieved from 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] executing node 0_2_Node-2
Test Node: O_O "TestData"
2022/09/09 17:12:47 Pausing request my_test_flow-1662714755 of flow my_test_flow
Node End
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] completed execution of node 0_2_Node-2
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] intermediate result from node 0_2_Node-2 to 0_3_Node-3 stored as 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] performing request for Node 0_3_Node-3, indegree count is 1
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] request submitted for Node 0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] intermediate result from Node 0_2_Node-2 to Node 0_3_Node-3 retrieved from 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] executing node 0_3_Node-3
Test Node: O_O "O_O "TestData""
Node End
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] completed execution of node 0_3_Node-3
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] intermediate result from node 0_3_Node-3 to 0_4_Node-4 stored as 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] performing request for Node 0_4_Node-4, indegree count is 1
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] request submitted for Node 0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] intermediate result from Node 0_3_Node-3 to Node 0_4_Node-4 retrieved from 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] executing node 0_4_Node-4
...
- Panic when exec
Stop
:
2022/09/09 17:15:03 Pausing request my_test_flow for flow my_test_flow-1662714890
Node End
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] completed execution of node 0_2_Node-2
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] failed to obtain pipeline state, error failed to get key core.my_test_flow.my_test_flow-1662714890.request-state, nil
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] pipeline is not active
panic: [request `my_test_flow-1662714890`] Pipeline is not active
goroutine 30 [running]:
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).findNextNodeToExecute(0xc0002c7c78)
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:609 +0x614
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc0002c7c78, 0xc0002c7c58)
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:1266 +0x9c8
github.com/s8sg/goflow/core/runtime/controller/handler.PartialExecuteFlowHandler(0xc0002c7dd0, 0xc00028ecc0?, {0x14771f8, 0xc0002a6410})
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/runtime/controller/handler/partial_execute_flow_handler.go:25 +0xe5
github.com/s8sg/goflow/runtime.(*FlowRuntime).handlePartialRequest(0xc00012af70, 0xc00028ecc0)
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:457 +0x1c6
github.com/s8sg/goflow/runtime.(*FlowRuntime).handleRequest(0xc0002e6380?, 0xc0002e21e0?, {0xc0002a43b0?, 0x13260a0?})
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:414 +0xc8
github.com/s8sg/goflow/runtime.(*FlowRuntime).Consume(0xc00012af70, {0x1473a98, 0xc0002320e0})
/Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:394 +0x2ae
github.com/adjust/rmq/v4.(*redisQueue).consumerConsume(0xc0001d4000, {0x14701a0, 0xc00012af70})
/Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:282 +0x9f
created by github.com/adjust/rmq/v4.(*redisQueue).AddConsumer
/Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:260 +0xe5
Here is the test code:
package main
import (
"flag"
"fmt"
"time"
flow "github.com/s8sg/goflow/flow/v1"
goflow "github.com/s8sg/goflow/v1"
)
const (
flowName = "my_test_flow"
)
var (
server = flag.Bool("s", false, "start server")
op = flag.String("o", "exec", "operation")
rid = flag.String("i", "", "request id")
)
func workload(data []byte, option map[string][]string) ([]byte, error) {
fmt.Printf("Test Node: %s\n", string(data))
time.Sleep(10 * time.Second)
fmt.Println("Node End")
return []byte(fmt.Sprintf("O_O \"%s\"", string(data))), nil
}
func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
dag := workflow.Dag()
dag.Node("Node-1", workload)
dag.Node("Node-2", workload)
dag.Node("Node-3", workload)
dag.Node("Node-4", workload)
dag.Node("Node-5", workload)
dag.Node("Node-6", workload)
dag.Node("Node-7", workload)
dag.Node("Node-8", workload)
dag.Edge("Node-1", "Node-2")
dag.Edge("Node-2", "Node-3")
dag.Edge("Node-3", "Node-4")
dag.Edge("Node-4", "Node-5")
dag.Edge("Node-5", "Node-6")
dag.Edge("Node-6", "Node-7")
dag.Edge("Node-7", "Node-8")
return nil
}
func startServer() {
fs := &goflow.FlowService{
Port: 8088,
RedisURL: "127.0.0.1:6379",
WorkerConcurrency: 2,
RetryCount: 1,
DebugEnabled: true,
}
err := fs.Register(flowName, DefineWorkflow)
if err != nil {
panic(err)
}
err = fs.Start()
if err != nil {
panic(err)
}
}
func ExecOp(op string) string {
fs := &goflow.FlowService{
RedisURL: "127.0.0.1:6379",
}
reqId := *rid
if op == "exec" {
reqId = fmt.Sprintf("%s-%d", flowName, time.Now().Unix())
}
if len(reqId) == 0 {
panic("request id is empty")
}
switch op {
case "exec":
err := fs.Execute(flowName, &goflow.Request{
Body: []byte("TestData"),
RequestId: reqId,
})
if err != nil {
panic(err)
}
return reqId
case "pause":
err := fs.Pause(flowName, reqId)
if err != nil {
panic(err)
}
case "resume":
err := fs.Resume(flowName, reqId)
if err != nil {
panic(err)
}
case "stop":
err := fs.Stop(flowName, reqId)
if err != nil {
panic(err)
}
}
return reqId
}
func main() {
flag.Parse()
if *server {
startServer()
return
}
reqId := ExecOp(*op)
fmt.Println(reqId)
}
Start the server:
./wf -s
Client:
./wf -o exec
./wf -o pause -i [request_id]
./wf -o resume -i [request_id]
./wf -o stop -i [request_id]