[FEATURE REQUEST] Jobs aggregation
I wonder if this feature is on your roadmap.
TL;DR; Job aggregation allows you to enqueue multiple jobs successively, and have them passed to the Worker together rather than individually. The feature allows you to batch multiple successive operations into one.
You can see the usecase and the original PR: https://github.com/hibiken/asynq/issues/339
Final docs https://github.com/hibiken/asynq/wiki/Task-aggregation
Might be a duplicate of #453
This one's WIP and coming very soon.
Let me close this one since it's a duplicate.
@krhubert actually I wonder if you might be able to provide some more context on how you’re looking to use job aggregation / grouping / batching? I’ve read your linked issue on Asynq, not sure if that reflects your current use case. Mostly curious what timeframe you’re looking to batch over, whether and how you’re wanting to partition your batches, etc.
@bgentry I'm unsure if I can share details in the public discussion, but I can give more specific overview in the private conversation. I can hop on a call to talk or join slack/discord if you have some. Let me know if that is something you are interested in.
Mostly curious what timeframe you’re looking to batch over,
My usecase is very different from what I shared in the issue, although the aggregate feature described in the issue solves the problem.
The timeframe I use right now:
- Grace Period - 5 min
- Max Delay - 15 min
GroupGracePeriod: The grace period is renewed whenever a task with the same group key is added to the group
GroupMaxDelay: The grace period has a configurable upper bound, user can optionally set maximum delay, after which Asynq will deliver the tasks to Handler regardless of the remaining grace period
whether and how you’re wanting to partition your batches, etc.
I don't partition them - If we think about the same partitioning - because to me it can happen before adding a job, and after aggregation.
I know this context might not give you full visibility.
whenever a task with the same group key is added to the group
I think this maybe hints at what I was asking about partitioning. Essentially I'm trying to understand what criteria you want to use to group your jobs, whether it's merely about grouping any jobs of the same kind, or if you want to group on some other attrs.
I also sent you an email to try to connect another way about details you don't wish to post on GitHub. Cheers :v:
Are there news about this feature?
We're currently exploring river and might have a use case, where we need to create a batch of multiple jobs for a specific customer to be grouped together and sent as one transaction, because the endpoint is only allowing transmission in certain time frames which is out of our control.
The timeframe for grouping currently would be up to 120 seconds (e.g. the time we have to wait for the aggregation), but would need to be modified on each grouping based on a control parameter. So the current aggregation would happen e.g. in 60 seconds, the next 30 seconds later, and so on.
@phoenix147 no real news as of now. We kicked around a few potential ways of doing this but ultimately it got sidelined in favor of some other work. We're getting close to finishing up some of those other projects so hopefully we'll be able to pick back up on this one soon.
It definitely helps that you've shared more info on your use case. The different options we considered were addressing two very different use cases under this umbrella: batching together of jobs already on the queue, vs batching over some arbitrarily long time threshold as in your case. The former is more about processing efficiency, throughput, and reducing system load, whereas the latter is about expensive and/or constrained actions that are willing to accept more significant delays.
@bgentry thx for your quick response! regarding your options: i think both of them would work for us: for the first one we could create a timer per customer where we then aggregate all jobs for this customer, while the second integrates this "timer" into river itself. I even think the first one would be more flexible than the aggregation feature of asynq above, because there you can just set a fixed grace period, which would not work for us.
Currently we plan to either create a scheduled aggregation job or to create a job and snooze it until it is allowed to run and then fetch all the resources required from the database
I can provide a use case for this: let's say you're sending out notifications to a user but each one is a discrete job. Ideally, we'd be able to run a job every e.g. 1 minute that would batch process all of the notifications keyed on a user id or email address.
I process invoices and maybe I want to fail if any of the invoices for some user fails to process, I am also parsing timeseries data and wish to batch ingest them into a database. The second one I can work around using channels, but that's only usable if there's only one client node.
I'm simply looking for a batching mechanism with a grouping key (maybe multiple keys, that shouldn't be any extra work no?).
just providing another usecase, grouping multiple single executions into single one is pretty useful to execute some machine learning models.