p-queue icon indicating copy to clipboard operation
p-queue copied to clipboard

How to add to queue asynchronously?

Open rightaway opened this issue 7 years ago • 10 comments

I have a concurrency of 10 and the tasks check queue.size to see whether the queue has fallen underneath a certain number. As soon as a task sees that, it should asynchronously add 20 more tasks to the queue. But how can I do this without the other 9 tasks also trying to add those 20 new tasks to the queue as well?

I'm looking for a way to keep the size of the queue above a certain minimum number (20 in this case) by adding new items to the queue asynchronously (such as calling an API or database to determine what those next 20 queue items should be).

rightaway avatar Aug 21 '18 15:08 rightaway

Do you think this would make a good feature request? The ability to set a minimumSize option which in my example would be 20, and a getMoreQueueItems which could be a Promise or async function that would asynchronously get the elements to be added to the queue. And p-queue would handle this with the concurrency so it's only being done once.

rightaway avatar Aug 21 '18 18:08 rightaway

The ability to add elements asynchronously is already present.

minimumSize option

queue.pause();
queue._add = queue.add;
queue.add = (...args) => {
	const result = queue._add(...args);

	if (queue.isPaused && queue.size >= 20) {
		queue.start();
	}

	return result;
};

queue.onIdle().then(() => queue.pause());

getMoreQueueItems

Wut? You are responsible for placing items into the queue :smiley:

szmarczak avatar Aug 29 '18 10:08 szmarczak

Maybe I haven't explained it well but I mean the getMoreQueueItems would be a callback that would return the next batch of queue items to add. So that p-queue can automatically add the next batch to the queue when the queue length gets below a certain size. But some kind of locking would be needed so that all currently running tasks that see the queue length below that size don't all call that callback.

rightaway avatar Aug 30 '18 08:08 rightaway

Like I said, you are responsible for adding items to the queue. getMoreQueueItems is a bad example.

If p-queue was an EventEmitter, we could attach a listener for every finished job. It'll look something like:

queue.pause();
queue.on('finishiedJob', () => {
    if (queue.size < 20) {
        queue.pause();
        addMoreItems();
    }
});
queue.on('add', () => {
    if (queue.size >= 20) {
        queue.start();
    }
});

I'll make a PR for events :)

szmarczak avatar Aug 30 '18 08:08 szmarczak

In that example would finishedJob be run maximum once at a time? Or would some kind of locking be needed to make sure addMoreItems isn't being run more than once at the same time by multiple jobs that have finished?

rightaway avatar Sep 01 '18 14:09 rightaway

Note: This is just a proposal.

finishedJob would fire after each job (item in the queue). We could synchronize these events using setImmediate (so there's only one event at the time). Or we could let the user do that. Something like:

queue.on('finishiedJob', () => {
    setImmediate(() => {
        if (queue.size < 20) {
            queue.pause();
            addMoreItems();
        }
    }
});

szmarczak avatar Sep 01 '18 21:09 szmarczak

This synchronization would only work if the callback to setImmediate and addMoreItems are synchronous? If either is asynchronous then wouldn't they not be synchronized, so many jobs could add at the same time inadvertently?

rightaway avatar Sep 02 '18 16:09 rightaway

setImmediate queues the function to be fired after I/O operations. It doesn't matter if addMoreItems is synchronous or not. You can always do:

setImmediate(async () => {
    if (queue.size < 20) {
        queue.pause();
        await addMoreItems();
    }
});

szmarczak avatar Sep 02 '18 16:09 szmarczak

While one setImmediate callback is running, could another setImmediate callback start running while the system is waiting for the async addMoreItems to complete? If so, then the same locking issue applies, and if not it would be the solution.

And if that's the case I guess setImmediate removes the need for locking in its callback since only one of them could ever run at a time! Have I understood it right?

rightaway avatar Sep 02 '18 19:09 rightaway

There can be only one setImmediate callback running at the time.

https://nodejs.org/api/timers.html#timers_setimmediate_callback_args

https://nodejs.org/en/docs/guides/event-loop-timers-and-nexttick/

szmarczak avatar Sep 02 '18 19:09 szmarczak