asynq icon indicating copy to clipboard operation
asynq copied to clipboard

[FEATURE REQUEST] Support BatchEnqueue for client

Open Percivalll opened this issue 2 years ago • 11 comments

When a lot of tasks need to be enqueued, current method is slow because every redis-op needs at least a RTT. For example: If I want enqueue 1000000 tasks, each client.Enqueue spends 13ms in my environment. So if I execute it without concurrency, this will spend 1000000*13=13000000ms, almost 3.6 hours. Definitely I can use a lot of goroutines to shorten the time, but takes a lot of cpu usage and many redis connections.

I think we should supply a BatchEnqueue method for supporting user to enqueue a lot of tasks at once. For redis broker, we can use pipeline to decrease network and cpu overhead.

Percivalll avatar Sep 02 '22 04:09 Percivalll

Related discussions: https://github.com/hibiken/asynq/issues/339#issuecomment-985507125 https://github.com/hibiken/asynq/issues/352

Percivalll avatar Sep 02 '22 04:09 Percivalll

I need that too :) I have several millions of tasks to enqueue in my workflow. Overall I really love the lib I can handle 7 millions tasks in 44 minutes (with some computation and database requests)

KillianH avatar Sep 06 '22 23:09 KillianH

Thank you @Serinalice for creating this feature request!

This feature makes a lot of sense and the package should support this use case. We should probably discuss the API first (What should it look like? How should we handle partial errors?)

hibiken avatar Sep 10 '22 14:09 hibiken

No problem, I'll describe my preliminary ideas!

On Sep 10, 2022, at 22:27, Ken Hibino @.***> wrote:



Thank you @Serinalicehttps://github.com/Serinalice for creating this feature request!

This feature makes a lot of sense and the package should support this use case. We should probably discuss the API first (What should it look like? How should we handle partial errors?)

— Reply to this email directly, view it on GitHubhttps://github.com/hibiken/asynq/issues/535#issuecomment-1242741446, or unsubscribehttps://github.com/notifications/unsubscribe-auth/AODYOTHFSG72DSHHWBFB53TV5SLF3ANCNFSM6AAAAAAQC5HN3U. You are receiving this because you were mentioned.Message ID: @.***>

Percivalll avatar Sep 11 '22 04:09 Percivalll

How about this:

func (c *Client) EnqueueBatch(tasks []*Task, opts ...Option) ([]*TaskInfo, error)
func (c *Client) EnqueueContext(ctx context.Context, tasks []*Task, opts ...Option) ([]*TaskInfo, error)

If error is nil, all tasks have been successfully enqueued. If not, array of task info saves all successfully tasks.

Percivalll avatar Sep 13 '22 05:09 Percivalll

How to configure/default batch size?

https://redis.io/docs/manual/pipelining/

IMPORTANT NOTE: While the client sends commands using pipelining, the server will be forced to queue the replies, using memory. So if you need to send a lot of commands with pipelining, it is better to send them as batches each containing a reasonable number, for instance 10k commands, read the replies, and then send another 10k commands again, and so forth. The speed will be nearly the same, but the additional memory used will be at most the amount needed to queue the replies for these 10k commands.

xuyang2 avatar Sep 13 '22 08:09 xuyang2

How to configure/default batch size?

https://redis.io/docs/manual/pipelining/

IMPORTANT NOTE: While the client sends commands using pipelining, the server will be forced to queue the replies, using memory. So if you need to send a lot of commands with pipelining, it is better to send them as batches each containing a reasonable number, for instance 10k commands, read the replies, and then send another 10k commands again, and so forth. The speed will be nearly the same, but the additional memory used will be at most the amount needed to queue the replies for these 10k commands.

By length of tasks.

Percivalll avatar Sep 13 '22 08:09 Percivalll

Would this new batch API pipeline the existing EVALSHA enqueue scripts or it will use a new Lua script that takes the batch of tasks and enqueue them all at once?

yousifh avatar Sep 14 '22 23:09 yousifh

Are there any new developments on this issue?

5idu avatar Nov 08 '22 01:11 5idu

Should this API support a combination of initial task states between "aggregating", "pending", and "scheduled"?

developersam1995 avatar Mar 05 '23 08:03 developersam1995

any update?

thanhps42 avatar May 25 '24 01:05 thanhps42