asynq
asynq copied to clipboard
[FEATURE REQUEST] Support changing priority queues at runtime
Is your feature request related to a problem? Please describe. I use asynq at my company and rely on the frequency of the priority queue to determine task ordering. However, there are instances where this order needs to be adjusted.
Queues: map[string]int{
"group1": 6,
"group2": 4
},
->
Queues: map[string]int{
"group1": 4
"group2": 6
},
However, I cannot change it immediately in runtime. I have to restart server(pods). Moreover, my company utilizes a "Dynamic Config Loader" for updating applications at runtime. Therefore, if this feature were to be released, it would prove to be immensely useful Describe the solution you'd like
- Should provide change priority queue in runtime like below
func main() {
srv := asynq.NewServer(
asynq.RedisClientOpt{
Addr: redisAddr,
},
asynq.Config{
// Specify how many concurrent workers to use
Concurrency: 10,
// Optionally specify multiple queues with different priority.
Queues: map[string]int{
"critical": 6,
"default": 3,
"low": 1,
},
ErrorHandler: asynq.ErrorHandlerFunc(HandleErrorFunc),
StrictPriority: true,
},
)
// mux maps a type to a handler
mux := asynq.NewServeMux()
mux.HandleFunc(tasks.TypeEmailDelivery, HandleStartPullTask)
// Reload dynamic config
// set queuesCfg
// cfg is application config
// asynq:
// strictPriority: true
// queues:
// map[string]int{
// "critical": 3,
// "default": 6,
// "low": 1,
// }
//
srv.SetQueues(cfg.asynq.queues,cfg,cfg.asynq.strictPriority)
// mux.Handle(tasks.TypeImageResize, tasks.NewImageProcessor())
// ...register other handlers...
//
if err := srv.Run(mux); err != nil {
log.Fatalf("could not run server: %v", err)
}
}
Describe alternatives you've considered
Additional context We can see a draft solution in my repo https://github.com/linhbkhn95/asynq/pull/6/files @hibiken @kamikazechaser How's about think?
I can see such a feature being added if the underlying p.queueConfig
map is replaced/updated in a safe way. However, this could set a precedent to update almost every underlying config at runtime 😄.
@kamikazechaser It is a good approach for applying dynamic config loader
in most company without restarting app.
I also use for feature flags
as a exprirement for new features.
https://github.com/uber/piranha
Hi @kamikazechaser, do u use telegram? I was just out of work, I wanna contribute more open-source project like that
Hi @kamikazechaser, do u use telegram? I was just out of work, I wanna contribute more open-source project like that
Yes. On the same username.
I cant find it. my telegram username is linhbkhn95
. Can u message me?
I cant find it. my telegram username is
linhbkhn95
. Can u message me?
Sent you a message from my alt, my main is kc^.
furthermore, For dynamic config loader
, if we use a logger, we can change the log level at runtime by the SetLevel
method without restart pod(server)
// SetLevel alters the logging level.
func (lvl AtomicLevel) SetLevel(l zapcore.Level) {
lvl.l.Store(int32(l))
}
https://github.com/uber-go/zap/blob/master/level.go#L122 @kamikazechaser
Hi @hibiken @kamikazechaser Can you guys tell me about what we should do next step?
There are a lot of requests for such a feature, so why not. Something like Server.SetQueuesPriority(map[string]int) error
would work. There would need to be checks for whether the new map has the same keys as the old one and whether strictPriority is enabled in the current config e.t.c. and all these behind a mutex guard.
Do you have something similar in mind?