delayqueue icon indicating copy to clipboard operation
delayqueue copied to clipboard

关于大量任务

Open fero2004 opened this issue 1 year ago • 1 comments

func (q *DelayQueue) SendScheduleMsgs(payloads []string, t []time.Time, opts ...interface{}) error {
	retryCount := q.defaultRetryCount
	for _, opt := range opts {
		switch o := opt.(type) {
		case retryCountOpt:
			retryCount = uint(o)
		case msgTTLOpt:
			q.msgTTL = time.Duration(o)
		}
	}
	pipe := q.redisCli.TxPipeline() // 这里是在warpper里添加的
	now := time.Now()
	ctx := context.Background()
	for i := 0; i < len(t); i++ {
		idStr := uuid.Must(uuid.NewRandom()).String()
		msgTTL := t[i].Sub(now) + q.msgTTL
		pipe.Set(ctx, q.genMsgKey(idStr), payloads[i], msgTTL)
		pipe.HSet(ctx, q.retryCountKey, idStr, strconv.Itoa(int(retryCount)))
		values := map[string]float64{idStr: float64(t[i].Unix())}
		var zs []redis.Z
		for member, score := range values {
			zs = append(zs, redis.Z{
				Score:  score,
				Member: member,
			})
		}
		pipe.ZAdd(ctx, q.pendingKey, zs...)
	}
	_, err := pipe.Exec(ctx)
	if err != nil {
		return fmt.Errorf("push to pending failed: %v", err)
	}
	return nil
}

就不提pr了,作者看下,这样写是否有问题.如果可行的话可以手动修改下

fero2004 avatar Sep 18 '23 09:09 fero2004