asynq
asynq copied to clipboard
[FEATURE REQUEST] Support BatchEnqueue for client
When a lot of tasks need to be enqueued, current method is slow because every redis-op needs at least a RTT. For example: If I want enqueue 1000000 tasks, each client.Enqueue spends 13ms in my environment. So if I execute it without concurrency, this will spend 1000000*13=13000000ms, almost 3.6 hours. Definitely I can use a lot of goroutines to shorten the time, but takes a lot of cpu usage and many redis connections.
I think we should supply a BatchEnqueue method for supporting user to enqueue a lot of tasks at once. For redis broker, we can use pipeline to decrease network and cpu overhead.
Related discussions: https://github.com/hibiken/asynq/issues/339#issuecomment-985507125 https://github.com/hibiken/asynq/issues/352
I need that too :) I have several millions of tasks to enqueue in my workflow. Overall I really love the lib I can handle 7 millions tasks in 44 minutes (with some computation and database requests)
Thank you @Serinalice for creating this feature request!
This feature makes a lot of sense and the package should support this use case. We should probably discuss the API first (What should it look like? How should we handle partial errors?)
No problem, I'll describe my preliminary ideas!
On Sep 10, 2022, at 22:27, Ken Hibino @.***> wrote:
Thank you @Serinalicehttps://github.com/Serinalice for creating this feature request!
This feature makes a lot of sense and the package should support this use case. We should probably discuss the API first (What should it look like? How should we handle partial errors?)
— Reply to this email directly, view it on GitHubhttps://github.com/hibiken/asynq/issues/535#issuecomment-1242741446, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AODYOTHFSG72DSHHWBFB53TV5SLF3ANCNFSM6AAAAAAQC5HN3U. You are receiving this because you were mentioned.Message ID: @.***>
How about this:
func (c *Client) EnqueueBatch(tasks []*Task, opts ...Option) ([]*TaskInfo, error)
func (c *Client) EnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) ([]*TaskInfo, error)
If error is nil, all tasks have been successfully enqueued. If not, array of task info saves all successfully tasks.
How to configure/default batch size?
https://redis.io/docs/manual/pipelining/
IMPORTANT NOTE: While the client sends commands using pipelining, the server will be forced to queue the replies, using memory. So if you need to send a lot of commands with pipelining, it is better to send them as batches each containing a reasonable number, for instance 10k commands, read the replies, and then send another 10k commands again, and so forth. The speed will be nearly the same, but the additional memory used will be at most the amount needed to queue the replies for these 10k commands.
How to configure/default batch size?
https://redis.io/docs/manual/pipelining/
IMPORTANT NOTE: While the client sends commands using pipelining, the server will be forced to queue the replies, using memory. So if you need to send a lot of commands with pipelining, it is better to send them as batches each containing a reasonable number, for instance 10k commands, read the replies, and then send another 10k commands again, and so forth. The speed will be nearly the same, but the additional memory used will be at most the amount needed to queue the replies for these 10k commands.
By length of tasks.
Would this new batch API pipeline the existing EVALSHA
enqueue scripts or it will use a new Lua script that takes the batch of tasks and enqueue them all at once?
Are there any new developments on this issue?
Should this API support a combination of initial task states between "aggregating", "pending", and "scheduled"?
any update?