asynq icon indicating copy to clipboard operation
asynq copied to clipboard

[FEATURE REQUEST] Per-task type aggregation

Open stephen opened this issue 3 years ago • 2 comments

Is your feature request related to a problem? Please describe. I'm trying out the new task batching/aggregation feature from https://github.com/hibiken/asynq/issues/339. I would like to be able to configure grouping parameters on a task-level basis instead of on the server.

For instance, if I have one kind of batched task that sends out high-urgency notifications (i.e. group any in the last minute) and also a low-urgency notification (i.e. group any in the last day).

Describe the solution you'd like I'd like to be able to specify wait/maxwait/maxsize/aggregation function per task, maybe like:

mux.HandleFunc(
  taskName,
  func(...),
  mux.Batching(wait, maxWait, aggregationFunc, etc)
)

Describe alternatives you've considered I think I could also achieve this by creating two separate servers (maybe even two asynq servers in the same process? I have not tested this yet).

Additional context n/a

stephen avatar Jul 13 '22 19:07 stephen

(this may end up just becoming a thread of notes on the aggregation feature - let me know if I should split this out into different issues)

Another feedback might be to support returning errors from GroupAggregatorFunc. The example doesn't return an error, but you could imagine a more complex aggregation function that needs to deserialize the individual tasks to build the aggregated one:

func Aggregator(group string, tasks []*asynq.Task) *asynq.Task {
	payloads := make([]*Payload, 0, len(tasks))
	for _, t := range tasks {
		n := &notifications.Payload{}
		if err := serialization.Unmarshal(t.Payload(), n); err != nil {
			// what do we do with this error?
		}

		payloads = append(payloads, n.Notification)
	}

	combined := &BatchedPayload{
		Batched: payloads,
	}

	m, err := serialization.Marshal(combined)
	if err != nil {
		// same here?
	}

	return asynq.NewTask(tasks[0].Type(), m)
}

stephen avatar Jul 13 '22 20:07 stephen

@stephen Thanks for this great feedback! Let me think about API changes to accommodate these use cases

hibiken avatar Jul 19 '22 02:07 hibiken