fastflow
fastflow copied to clipboard
在RunBefore中如果有错误,则任务阻塞
我试图在RunBefore函数中获取TaskInstance相关参数,但返回结果taskIns1为空指针,然后程序就会卡在fmt.Println("taskIns1.2", taskIns1.TaskID),没有显示的报错,附完整代码
package main
import (
"fmt"
"log"
"time"
"github.com/shiningrush/fastflow"
mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
"github.com/shiningrush/fastflow/pkg/entity"
"github.com/shiningrush/fastflow/pkg/entity/run"
"github.com/shiningrush/fastflow/pkg/mod"
mongoStore "github.com/shiningrush/fastflow/store/mongo"
)
type PrintAction struct {
}
// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
return "PrintAction"
}
func (a *PrintAction) RunBefore(ctx run.ExecuteContext, params interface{}) error {
fmt.Println("-------- Run action before")
ctx1 := ctx.Context()
taskIns1, _ := ctx1.Value("running-task").(*entity.TaskInstance)
fmt.Println("taskIns1.1", taskIns1)
fmt.Println("taskIns1.2", taskIns1.TaskID)
return nil
}
func (a *PrintAction) RunAfter(ctx run.ExecuteContext, params interface{}) error {
fmt.Println("Run action after")
return nil
}
func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
fmt.Println("Run action start: ", time.Now())
taskIns, _ := entity.CtxRunningTaskIns(ctx.Context())
fmt.Println(taskIns.TaskID)
return nil
}
func main() {
// Register action
fastflow.RegisterAction([]run.Action{
&PrintAction{},
})
// init keeper, it used to e
keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
Key: "worker-1",
// if your mongo does not set user/pwd, you should remove it
ConnStr: "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
Database: "mongo-demo",
Prefix: "test",
})
if err := keeper.Init(); err != nil {
log.Fatal(fmt.Errorf("init keeper failed: %w", err))
}
// init store
st := mongoStore.NewStore(&mongoStore.StoreOption{
// if your mongo does not set user/pwd, you should remove it
ConnStr: "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
Database: "mongo-demo",
Prefix: "test",
})
if err := st.Init(); err != nil {
log.Fatal(fmt.Errorf("init store failed: %w", err))
}
go createDagAndInstance()
// start fastflow
if err := fastflow.Start(&fastflow.InitialOption{
Keeper: keeper,
Store: st,
// use yaml to define dag
// 所有的yaml文件都会被执行
ReadDagFromDir: "./",
}); err != nil {
panic(fmt.Sprintf("init fastflow failed: %s", err))
}
}
func createDagAndInstance() {
// wait fast start completed
time.Sleep(time.Second)
// run some dag instance
// for i := 0; i < 10; i++ {
_, err := mod.GetCommander().RunDag("test-dag", nil)
if err != nil {
log.Fatal(err)
}
// time.Sleep(time.Second * 10)
// }
}
id: "test-dag"
name: "test"
vars:
fileName:
desc: "the file name"
defaultValue: "file.txt"
tasks:
- id: "task1"
actionName: "PrintAction"
preCheck:
isIgnoreFiles:
act: skip #you can set "skip" or "block"
conditions:
- source: vars # source could be "vars" or "share-data"
key: "fileName"
op: "in"
values: ["warn.txt", "error.txt", "file1.txt"]
- id: "task2"
actionName: "PrintAction"
dependOn: ["task1"]
- id: "task3"
actionName: "PrintAction"
dependOn: ["task1"]