goflow icon indicating copy to clipboard operation
goflow copied to clipboard

Pause, Resume and Stop don't work fine

Open danli001 opened this issue 1 year ago • 0 comments

Expect: Pause can pause the specified workflow, and Resume can resume it. Stop can stop active workflow and set its status to Finished.

Actually:

  1. After running Pasue, the workflow remains executed until the end.
...
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] intermediate result from node 0_1_Node-1 to 0_2_Node-2 stored as 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:45 [request `my_test_flow-1662714755`] performing request for Node 0_2_Node-2, indegree count is 1
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] request submitted for Node 0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] intermediate result from Node 0_1_Node-1 to Node 0_2_Node-2 retrieved from 0_1_Node-1--0_2_Node-2
2022/09/09 17:12:46 [request `my_test_flow-1662714755`] executing node 0_2_Node-2
Test Node: O_O "TestData"
2022/09/09 17:12:47 Pausing request my_test_flow-1662714755 of flow my_test_flow
Node End
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] completed execution of node 0_2_Node-2
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] intermediate result from node 0_2_Node-2 to 0_3_Node-3 stored as 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] performing request for Node 0_3_Node-3, indegree count is 1
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_3_Node-3
2022/09/09 17:12:56 [request `my_test_flow-1662714755`] request submitted for Node 0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] intermediate result from Node 0_2_Node-2 to Node 0_3_Node-3 retrieved from 0_2_Node-2--0_3_Node-3
2022/09/09 17:12:57 [request `my_test_flow-1662714755`] executing node 0_3_Node-3
Test Node: O_O "O_O "TestData""
Node End
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] completed execution of node 0_3_Node-3
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] intermediate result from node 0_3_Node-3 to 0_4_Node-4 stored as 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] performing request for Node 0_4_Node-4, indegree count is 1
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] Request is paused, storing partial state for node: 0_4_Node-4
2022/09/09 17:13:07 [request `my_test_flow-1662714755`] request submitted for Node 0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] partial request received
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] intermediate result from Node 0_3_Node-3 to Node 0_4_Node-4 retrieved from 0_3_Node-3--0_4_Node-4
2022/09/09 17:13:08 [request `my_test_flow-1662714755`] executing node 0_4_Node-4
...
  1. Panic when exec Stop :
2022/09/09 17:15:03 Pausing request my_test_flow for flow my_test_flow-1662714890
Node End
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] completed execution of node 0_2_Node-2
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] failed to obtain pipeline state, error failed to get key core.my_test_flow.my_test_flow-1662714890.request-state, nil
2022/09/09 17:15:11 [request `my_test_flow-1662714890`] pipeline is not active
panic: [request `my_test_flow-1662714890`] Pipeline is not active

goroutine 30 [running]:
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).findNextNodeToExecute(0xc0002c7c78)
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:609 +0x614
github.com/s8sg/goflow/core/sdk/executor.(*FlowExecutor).Execute(0xc0002c7c78, 0xc0002c7c58)
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/sdk/executor/executor.go:1266 +0x9c8
github.com/s8sg/goflow/core/runtime/controller/handler.PartialExecuteFlowHandler(0xc0002c7dd0, 0xc00028ecc0?, {0x14771f8, 0xc0002a6410})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/core/runtime/controller/handler/partial_execute_flow_handler.go:25 +0xe5
github.com/s8sg/goflow/runtime.(*FlowRuntime).handlePartialRequest(0xc00012af70, 0xc00028ecc0)
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:457 +0x1c6
github.com/s8sg/goflow/runtime.(*FlowRuntime).handleRequest(0xc0002e6380?, 0xc0002e21e0?, {0xc0002a43b0?, 0x13260a0?})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:414 +0xc8
github.com/s8sg/goflow/runtime.(*FlowRuntime).Consume(0xc00012af70, {0x1473a98, 0xc0002320e0})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/s8sg/goflow/runtime/flow_runtime.go:394 +0x2ae
github.com/adjust/rmq/v4.(*redisQueue).consumerConsume(0xc0001d4000, {0x14701a0, 0xc00012af70})
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:282 +0x9f
created by github.com/adjust/rmq/v4.(*redisQueue).AddConsumer
        /Users/dli/workspace/lab/gowf-test/vendor/github.com/adjust/rmq/v4/queue.go:260 +0xe5

Here is the test code:

package main

import (
	"flag"
	"fmt"
	"time"

	flow "github.com/s8sg/goflow/flow/v1"
	goflow "github.com/s8sg/goflow/v1"
)

const (
	flowName = "my_test_flow"
)

var (
	server = flag.Bool("s", false, "start server")
	op     = flag.String("o", "exec", "operation")
	rid    = flag.String("i", "", "request id")
)

func workload(data []byte, option map[string][]string) ([]byte, error) {
	fmt.Printf("Test Node: %s\n", string(data))
	time.Sleep(10 * time.Second)
	fmt.Println("Node End")
	return []byte(fmt.Sprintf("O_O \"%s\"", string(data))), nil
}

func DefineWorkflow(workflow *flow.Workflow, context *flow.Context) error {
	dag := workflow.Dag()
	dag.Node("Node-1", workload)
	dag.Node("Node-2", workload)
	dag.Node("Node-3", workload)
	dag.Node("Node-4", workload)
	dag.Node("Node-5", workload)
	dag.Node("Node-6", workload)
	dag.Node("Node-7", workload)
	dag.Node("Node-8", workload)
	dag.Edge("Node-1", "Node-2")
	dag.Edge("Node-2", "Node-3")
	dag.Edge("Node-3", "Node-4")
	dag.Edge("Node-4", "Node-5")
	dag.Edge("Node-5", "Node-6")
	dag.Edge("Node-6", "Node-7")
	dag.Edge("Node-7", "Node-8")
	return nil
}

func startServer() {
	fs := &goflow.FlowService{
		Port:              8088,
		RedisURL:          "127.0.0.1:6379",
		WorkerConcurrency: 2,
		RetryCount:        1,
		DebugEnabled:      true,
	}
	err := fs.Register(flowName, DefineWorkflow)
	if err != nil {
		panic(err)
	}

	err = fs.Start()
	if err != nil {
		panic(err)
	}
}

func ExecOp(op string) string {
	fs := &goflow.FlowService{
		RedisURL: "127.0.0.1:6379",
	}

	reqId := *rid
	if op == "exec" {
		reqId = fmt.Sprintf("%s-%d", flowName, time.Now().Unix())
	}
	if len(reqId) == 0 {
		panic("request id is empty")
	}

	switch op {
	case "exec":
		err := fs.Execute(flowName, &goflow.Request{
			Body:      []byte("TestData"),
			RequestId: reqId,
		})
		if err != nil {
			panic(err)
		}
		return reqId

	case "pause":
		err := fs.Pause(flowName, reqId)
		if err != nil {
			panic(err)
		}
	case "resume":
		err := fs.Resume(flowName, reqId)
		if err != nil {
			panic(err)
		}
	case "stop":
		err := fs.Stop(flowName, reqId)
		if err != nil {
			panic(err)
		}
	}

	return reqId
}

func main() {
	flag.Parse()

	if *server {
		startServer()
		return
	}

	reqId := ExecOp(*op)
	fmt.Println(reqId)
}

Start the server:

./wf -s

Client:

./wf -o exec 
./wf -o pause -i [request_id]
./wf -o resume -i [request_id]
./wf -o stop -i [request_id]

danli001 avatar Sep 09 '22 09:09 danli001