gotasks
gotasks copied to clipboard
有个问题想请教下
您好,有个问题想请教下,gotasks.runHandlers方法里有下面这段代码:
if task.CurrentHandlerIndex > i {
log.Printf("skip step %d of task %s because it was executed successfully", i, task.ID)
continue
}
请问什么情况想task.CurrentHandlerIndex 才会大于i呢?因为我看下面是把i赋值给task.CurrentHandlerIndex了,而别的地方也没发现有修改
还有,run方法里的:
if queue.Async {
gopool.Submit(fn)
} else {
fn()
}
在worker端貌似根本没得设置queue.Async,所以worker端的Run永远不会异步跑?
for i, handler := range handlers 这里i就在不断的变大,task.CurrentHandlerIndex = i 赋值,broker.Update(task) 保存到redis。
重试的时候,如果一共三个handler,1成功,2失败。那么1就会保存到redis里,重试任务的时候就会直接从2开始。
在 queue.go 里有:
func WithAsyncHandleTask(async bool) QueueOption {
return func(q *Queue) {
q.Async = async
}
}
在
queue.go里有:func WithAsyncHandleTask(async bool) QueueOption { return func(q *Queue) { q.Async = async } }
但是您gotasks.Run是传queueName进去new了一个默认的queue啊,默认的queue中async是false,而run也是在gotasks.Run里面跑的,等于在跑gotasks.Run的时候async一直是false
for i, handler := range handlers这里i就在不断的变大,task.CurrentHandlerIndex = i赋值,broker.Update(task)保存到redis。重试的时候,如果一共三个handler,1成功,2失败。那么1就会保存到redis里,重试任务的时候就会直接从2开始。
reentrantMapLock.RLock()
reentrantOptions, ok := reentrantMap[handlerName]
reentrantMapLock.RUnlock()
for j := 0; j < reentrantOptions.MaxTimes; j++ {
args, err = handler(args)
if err == nil {
break
}
time.Sleep(time.Microsecond * time.Duration(reentrantOptions.SleepyMS))
log.Printf("retry step %d of task %s the %d rd time", task.CurrentHandlerIndex, task.ID, j)
}
重试的逻辑好像跟原来的i没什么关系啊
不是这个重试。是指重新入队,再次消费的时候。不过我不记得有没有加这个逻辑了
Queue这个好像是个bug,我基本都是用同步的。应该要把选项改到worker上来才对。
Queue这个好像是个bug,我基本都是用同步的。应该要把选项改到worker上来才对。
赞同
不是这个重试。是指重新入队,再次消费的时候。不过我不记得有没有加这个逻辑了
是handleTask里的这段吗?
defer func() {
r := recover()
status := "success"
if r != nil {
status = "fail"
task.ResultLog = string(debug.Stack())
broker.Update(task)
log.Printf("recovered from queue %s and task %+v with recover info %+v", queueName, task, r)
}
taskHistogram.WithLabelValues(task.QueueName, task.JobName, status).Observe(task.UpdatedAt.Sub(task.CreatedAt).Seconds())
if r != nil {
// save to fatal queue
task.QueueName = FatalQueueName
broker.Enqueue(task)
}
}()
对的
对的
但是这里task.QueueName = FatalQueueName,队列名字已经换了呀,下次是取不出来错误的这条吧?
可以代码重新入队
可以代码重新入队
入的已经不是原来的队了呀,是fatal队列
目前失败的是直接当dead letter了。没有做这个功能。欢迎PR
目前失败的是直接当dead letter了。没有做这个功能。欢迎PR
if r != nil {
broker.Enqueue(task)
// save to fatal queue
task.QueueName = FatalQueueName
broker.Enqueue(task)
}
这样是不是就可以了?
这样估计不行.用户没有定义dead letter一定能重试.所以只能加一个函数用来做这个事情,但是框架不能主动帮用户做这个事情.有一些任务失败了也不能重试.