asynq
asynq copied to clipboard
[FEATURE REQUEST] Per-task type aggregation
Is your feature request related to a problem? Please describe. I'm trying out the new task batching/aggregation feature from https://github.com/hibiken/asynq/issues/339. I would like to be able to configure grouping parameters on a task-level basis instead of on the server.
For instance, if I have one kind of batched task that sends out high-urgency notifications (i.e. group any in the last minute) and also a low-urgency notification (i.e. group any in the last day).
Describe the solution you'd like I'd like to be able to specify wait/maxwait/maxsize/aggregation function per task, maybe like:
mux.HandleFunc(
taskName,
func(...),
mux.Batching(wait, maxWait, aggregationFunc, etc)
)
Describe alternatives you've considered I think I could also achieve this by creating two separate servers (maybe even two asynq servers in the same process? I have not tested this yet).
Additional context n/a
(this may end up just becoming a thread of notes on the aggregation feature - let me know if I should split this out into different issues)
Another feedback might be to support returning errors from GroupAggregatorFunc. The example doesn't return an error, but you could imagine a more complex aggregation function that needs to deserialize the individual tasks to build the aggregated one:
func Aggregator(group string, tasks []*asynq.Task) *asynq.Task {
payloads := make([]*Payload, 0, len(tasks))
for _, t := range tasks {
n := ¬ifications.Payload{}
if err := serialization.Unmarshal(t.Payload(), n); err != nil {
// what do we do with this error?
}
payloads = append(payloads, n.Notification)
}
combined := &BatchedPayload{
Batched: payloads,
}
m, err := serialization.Marshal(combined)
if err != nil {
// same here?
}
return asynq.NewTask(tasks[0].Type(), m)
}
@stephen Thanks for this great feedback! Let me think about API changes to accommodate these use cases