fastflow icon indicating copy to clipboard operation
fastflow copied to clipboard

在RunBefore中如果有错误,则任务阻塞

Open AHHH32 opened this issue 9 months ago • 3 comments

我试图在RunBefore函数中获取TaskInstance相关参数,但返回结果taskIns1为空指针,然后程序就会卡在fmt.Println("taskIns1.2", taskIns1.TaskID),没有显示的报错,附完整代码

package main

import (
	"fmt"
	"log"
	"time"

	"github.com/shiningrush/fastflow"
	mongoKeeper "github.com/shiningrush/fastflow/keeper/mongo"
	"github.com/shiningrush/fastflow/pkg/entity"
	"github.com/shiningrush/fastflow/pkg/entity/run"
	"github.com/shiningrush/fastflow/pkg/mod"
	mongoStore "github.com/shiningrush/fastflow/store/mongo"
)

type PrintAction struct {
}

// Name define the unique action identity, it will be used by Task
func (a *PrintAction) Name() string {
	return "PrintAction"
}

func (a *PrintAction) RunBefore(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("-------- Run action before")
	ctx1 := ctx.Context()
	taskIns1, _ := ctx1.Value("running-task").(*entity.TaskInstance)
	fmt.Println("taskIns1.1", taskIns1)
	fmt.Println("taskIns1.2", taskIns1.TaskID)
	return nil
}

func (a *PrintAction) RunAfter(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("Run action after")
	return nil
}

func (a *PrintAction) Run(ctx run.ExecuteContext, params interface{}) error {
	fmt.Println("Run action start: ", time.Now())
	taskIns, _ := entity.CtxRunningTaskIns(ctx.Context())
	fmt.Println(taskIns.TaskID)

	return nil
}

func main() {
	// Register action
	fastflow.RegisterAction([]run.Action{
		&PrintAction{},
	})

	// init keeper, it used to e
	keeper := mongoKeeper.NewKeeper(&mongoKeeper.KeeperOption{
		Key: "worker-1",
		// if your mongo does not set user/pwd, you should remove it
		ConnStr:  "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := keeper.Init(); err != nil {
		log.Fatal(fmt.Errorf("init keeper failed: %w", err))
	}

	// init store
	st := mongoStore.NewStore(&mongoStore.StoreOption{
		// if your mongo does not set user/pwd, you should remove it
		ConnStr:  "mongodb://root:[email protected]:27017/fastflow?authSource=admin",
		Database: "mongo-demo",
		Prefix:   "test",
	})
	if err := st.Init(); err != nil {
		log.Fatal(fmt.Errorf("init store failed: %w", err))
	}

	go createDagAndInstance()

	// start fastflow
	if err := fastflow.Start(&fastflow.InitialOption{
		Keeper: keeper,
		Store:  st,
		// use yaml to define dag
		// 所有的yaml文件都会被执行
		ReadDagFromDir: "./",
	}); err != nil {
		panic(fmt.Sprintf("init fastflow failed: %s", err))
	}
}

func createDagAndInstance() {
	// wait fast start completed
	time.Sleep(time.Second)

	// run some dag instance
	// for i := 0; i < 10; i++ {
	_, err := mod.GetCommander().RunDag("test-dag", nil)
	if err != nil {
		log.Fatal(err)
	}
	// time.Sleep(time.Second * 10)
	// }
}
id: "test-dag"
name: "test"
vars:
  fileName:
    desc: "the file name"
    defaultValue: "file.txt"
tasks:
- id: "task1"
  actionName: "PrintAction"
  preCheck:
    isIgnoreFiles:
      act: skip #you can set "skip" or "block"
      conditions:
      - source: vars # source could be "vars" or "share-data"
        key: "fileName"
        op: "in"
        values: ["warn.txt", "error.txt", "file1.txt"]
- id: "task2"
  actionName: "PrintAction"
  dependOn: ["task1"]
- id: "task3"
  actionName: "PrintAction"
  dependOn: ["task1"]

AHHH32 avatar May 20 '24 09:05 AHHH32