asynq icon indicating copy to clipboard operation
asynq copied to clipboard

[FEATURE REQUEST] Support changing priority queues at runtime

Open linhbkhn95 opened this issue 10 months ago • 9 comments

Is your feature request related to a problem? Please describe. I use asynq at my company and rely on the frequency of the priority queue to determine task ordering. However, there are instances where this order needs to be adjusted.

 Queues: map[string]int{
        "group1": 6,
        "group2":  4
    },

->

 Queues: map[string]int{
        "group1": 4
        "group2":  6
    },

However, I cannot change it immediately in runtime. I have to restart server(pods). Moreover, my company utilizes a "Dynamic Config Loader" for updating applications at runtime. Therefore, if this feature were to be released, it would prove to be immensely useful Describe the solution you'd like

  • Should provide change priority queue in runtime like below
func main() {

	srv := asynq.NewServer(
		asynq.RedisClientOpt{
			Addr: redisAddr,
		},
		asynq.Config{
			// Specify how many concurrent workers to use
			Concurrency: 10,
			// Optionally specify multiple queues with different priority.
			Queues: map[string]int{
				"critical": 6,
				"default":  3,
				"low":      1,
			},
			ErrorHandler:   asynq.ErrorHandlerFunc(HandleErrorFunc),
			StrictPriority: true,
		},
	)

	// mux maps a type to a handler
	mux := asynq.NewServeMux()
	mux.HandleFunc(tasks.TypeEmailDelivery, HandleStartPullTask)

	// Reload dynamic config
	// set queuesCfg
	// cfg is application config
	// 	asynq:
	//      strictPriority: true
	//		queues:
	//			map[string]int{
	//				"critical": 3,
	//				"default":  6,
	//				"low":      1,
	//			}
	// 
	 srv.SetQueues(cfg.asynq.queues,cfg,cfg.asynq.strictPriority)

	// mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
	// ...register other handlers...
	//
	if err := srv.Run(mux); err != nil {
		log.Fatalf("could not run server: %v", err)
	}
}

Describe alternatives you've considered

Additional context We can see a draft solution in my repo https://github.com/linhbkhn95/asynq/pull/6/files @hibiken @kamikazechaser How's about think?

linhbkhn95 avatar Mar 26 '24 10:03 linhbkhn95

I can see such a feature being added if the underlying p.queueConfig map is replaced/updated in a safe way. However, this could set a precedent to update almost every underlying config at runtime 😄.

kamikazechaser avatar Mar 26 '24 13:03 kamikazechaser

@kamikazechaser It is a good approach for applying dynamic config loader in most company without restarting app. I also use for feature flags as a exprirement for new features. https://github.com/uber/piranha

linhbkhn95 avatar Mar 26 '24 14:03 linhbkhn95

Hi @kamikazechaser, do u use telegram? I was just out of work, I wanna contribute more open-source project like that

linhbkhn95 avatar Mar 26 '24 14:03 linhbkhn95

Hi @kamikazechaser, do u use telegram? I was just out of work, I wanna contribute more open-source project like that

Yes. On the same username.

kamikazechaser avatar Mar 26 '24 14:03 kamikazechaser

I cant find it. my telegram username is linhbkhn95. Can u message me?

linhbkhn95 avatar Mar 26 '24 15:03 linhbkhn95

I cant find it. my telegram username is linhbkhn95. Can u message me?

Sent you a message from my alt, my main is kc^.

kamikazechaser avatar Mar 26 '24 15:03 kamikazechaser

furthermore, For dynamic config loader, if we use a logger, we can change the log level at runtime by the SetLevel method without restart pod(server)

// SetLevel alters the logging level.
func (lvl AtomicLevel) SetLevel(l zapcore.Level) {
	lvl.l.Store(int32(l))
}

https://github.com/uber-go/zap/blob/master/level.go#L122 @kamikazechaser

linhbkhn95 avatar Mar 27 '24 07:03 linhbkhn95

Hi @hibiken @kamikazechaser Can you guys tell me about what we should do next step?

linhbkhn95 avatar May 06 '24 14:05 linhbkhn95

There are a lot of requests for such a feature, so why not. Something like Server.SetQueuesPriority(map[string]int) error would work. There would need to be checks for whether the new map has the same keys as the old one and whether strictPriority is enabled in the current config e.t.c. and all these behind a mutex guard.

Do you have something similar in mind?

kamikazechaser avatar May 07 '24 05:05 kamikazechaser