gotasks icon indicating copy to clipboard operation
gotasks copied to clipboard

有个问题想请教下

Open Kline-x opened this issue 4 years ago • 16 comments

您好,有个问题想请教下,gotasks.runHandlers方法里有下面这段代码:

	if task.CurrentHandlerIndex > i {
			log.Printf("skip step %d of task %s because it was executed successfully", i, task.ID)
			continue
		}

请问什么情况想task.CurrentHandlerIndex 才会大于i呢?因为我看下面是把i赋值给task.CurrentHandlerIndex了,而别的地方也没发现有修改

Kline-x avatar Apr 13 '21 08:04 Kline-x

还有,run方法里的:

		if queue.Async {
			gopool.Submit(fn)
		} else {
			fn()
		}

在worker端貌似根本没得设置queue.Async,所以worker端的Run永远不会异步跑?

Kline-x avatar Apr 13 '21 09:04 Kline-x

for i, handler := range handlers 这里i就在不断的变大,task.CurrentHandlerIndex = i 赋值,broker.Update(task) 保存到redis。

重试的时候,如果一共三个handler,1成功,2失败。那么1就会保存到redis里,重试任务的时候就会直接从2开始。

jiajunhuang avatar Apr 14 '21 02:04 jiajunhuang

queue.go 里有:

func WithAsyncHandleTask(async bool) QueueOption {
	return func(q *Queue) {
		q.Async = async
	}
}

jiajunhuang avatar Apr 14 '21 02:04 jiajunhuang

queue.go 里有:

func WithAsyncHandleTask(async bool) QueueOption {
	return func(q *Queue) {
		q.Async = async
	}
}

但是您gotasks.Run是传queueName进去new了一个默认的queue啊,默认的queue中async是false,而run也是在gotasks.Run里面跑的,等于在跑gotasks.Run的时候async一直是false

Kline-x avatar Apr 14 '21 02:04 Kline-x

for i, handler := range handlers 这里i就在不断的变大,task.CurrentHandlerIndex = i 赋值,broker.Update(task) 保存到redis。

重试的时候,如果一共三个handler,1成功,2失败。那么1就会保存到redis里,重试任务的时候就会直接从2开始。

reentrantMapLock.RLock()
reentrantOptions, ok := reentrantMap[handlerName]
reentrantMapLock.RUnlock()
for j := 0; j < reentrantOptions.MaxTimes; j++ {
    args, err = handler(args)
    if err == nil {
	    break
    }
    time.Sleep(time.Microsecond * time.Duration(reentrantOptions.SleepyMS))
    log.Printf("retry step %d of task %s the %d rd time", task.CurrentHandlerIndex, task.ID, j)
}

重试的逻辑好像跟原来的i没什么关系啊

Kline-x avatar Apr 14 '21 02:04 Kline-x

不是这个重试。是指重新入队,再次消费的时候。不过我不记得有没有加这个逻辑了

jiajunhuang avatar Apr 14 '21 02:04 jiajunhuang

Queue这个好像是个bug,我基本都是用同步的。应该要把选项改到worker上来才对。

jiajunhuang avatar Apr 14 '21 02:04 jiajunhuang

Queue这个好像是个bug,我基本都是用同步的。应该要把选项改到worker上来才对。

赞同

Kline-x avatar Apr 14 '21 03:04 Kline-x

不是这个重试。是指重新入队,再次消费的时候。不过我不记得有没有加这个逻辑了

是handleTask里的这段吗?

defer func() {
		r := recover()
		status := "success"

		if r != nil {
			status = "fail"

			task.ResultLog = string(debug.Stack())
			broker.Update(task)
			log.Printf("recovered from queue %s and task %+v with recover info %+v", queueName, task, r)
		}

		taskHistogram.WithLabelValues(task.QueueName, task.JobName, status).Observe(task.UpdatedAt.Sub(task.CreatedAt).Seconds())

		if r != nil {
			// save to fatal queue
			task.QueueName = FatalQueueName
			broker.Enqueue(task)
		}
}()

Kline-x avatar Apr 14 '21 03:04 Kline-x

对的

jiajunhuang avatar Apr 14 '21 03:04 jiajunhuang

对的

但是这里task.QueueName = FatalQueueName,队列名字已经换了呀,下次是取不出来错误的这条吧?

Kline-x avatar Apr 14 '21 03:04 Kline-x

可以代码重新入队

jiajunhuang avatar Apr 14 '21 04:04 jiajunhuang

可以代码重新入队

入的已经不是原来的队了呀,是fatal队列

Kline-x avatar Apr 14 '21 05:04 Kline-x

目前失败的是直接当dead letter了。没有做这个功能。欢迎PR

jiajunhuang avatar Apr 15 '21 08:04 jiajunhuang

目前失败的是直接当dead letter了。没有做这个功能。欢迎PR

if r != nil {
	broker.Enqueue(task)
	
	// save to fatal queue
	task.QueueName = FatalQueueName
	broker.Enqueue(task)
}

这样是不是就可以了?

Kline-x avatar Apr 16 '21 07:04 Kline-x

这样估计不行.用户没有定义dead letter一定能重试.所以只能加一个函数用来做这个事情,但是框架不能主动帮用户做这个事情.有一些任务失败了也不能重试.

jiajunhuang avatar Apr 29 '21 03:04 jiajunhuang