async-sema icon indicating copy to clipboard operation
async-sema copied to clipboard

Some nice way of async iterating?

Open wmertens opened this issue 4 years ago • 6 comments

I'm trying to limit uploads to 10 simultaneously, and I'm doing it like this:

	const uploadSema = new Sema(10)
	for (const upload of uploads) {
		await uploadSema.acquire()
		uploadOne(upload).finally(() => uploadSema.release())
	}
	await uploadSema.drain()

It's reasonably nice, but I was wondering if there wasn't a way to make this nicer.

I moved the logic to this helper function

export const queuedWork = async (items, fn, workers = 10) => {
	const sema = new Sema(workers)
	let threw = null
	for (const item of items) {
		if (threw) break
		await sema.acquire()
		// eslint-disable-next-line promise/catch-or-return
		Promise.resolve(fn(item))
			.catch(err => {
				threw = err
			})
			.finally(() => sema.release())
	}
	await sema.drain()
	if (threw) throw threw
}

Is this a good way of going about it? Is there maybe a more elegant way?

(wrote these tests too)

test('queuedWork async', async () => {
	const out = []
	await queuedWork([1, 5, 7, 89, 2], async n => out.push(n), 2)
	expect(out).toEqual([1, 5, 7, 89, 2])
})

test('queuedWork sync', async () => {
	const out = []
	await queuedWork([1, 5, 7, 89, 2], n => out.push(n), 2)
	expect(out).toEqual([1, 5, 7, 89, 2])
})

test('queuedWork throws', async () => {
	const out = []
	await expect(
		queuedWork(
			[1, 2, 3, 4, 5],
			async n => {
				if (n === 2) throw new Error('meep')
				else out.push(n)
			},
			2
		)
	).rejects.toThrow('meep')
	expect(out).toEqual([1, 3])
})


test('queuedWork throws sync', async () => {
	const out = []
	await expect(
		queuedWork(
			[1, 2, 3, 4, 5],
			n => {
				if (n === 2) throw new Error('meep')
				else out.push(n)
			},
			2
		)
	).rejects.toThrow('meep')
	expect(out).toEqual([1])
})

wmertens avatar Sep 16 '19 20:09 wmertens

What I have been doing is something like this:

await Promise.all(files.map(async (file) => {
  await sema.acquire();
  try {
    await upload(file);
  } finally {
    sema.release();
  }
}));

OlliV avatar Sep 18 '19 17:09 OlliV

@wmertens Actually, for your particular problem, you should look at this module instead: https://github.com/sindresorhus/p-queue

p-queue has some queue-based operations, event-emitters, and start/pause functionality.

async-sema has its uses of course, for e.g. if we are concerned about rate limiting.

cardin avatar Sep 19 '19 02:09 cardin

@OlliV That's nice too, with the difference that you create all the promises at once. So for <1000 items that's easier to reason about, but beyond that I'm worried about memory consumption.

@cardin hmmm p-queue seems to only work by adding functions to a queue, not by passing an array and having it be processed. So it doesn't help in this case, or am I mistaken?

wmertens avatar Sep 19 '19 07:09 wmertens

@wmertens The example code that OlliV gave shouldn't run into memory issues, since even 10,000 pending promises shouldn't be too much of an issue if they're not allowed to start.

p-queue has an .addAll() that accepts an array of functions. Like queue.add(() => uploadOne(upload)). As for why functions instead of promises, the author's rationale is to have more control over when the queue begins -> functions only execute the async operation when called, unlike promises which is uncontrollable.

cardin avatar Sep 19 '19 07:09 cardin

@cardin but in both cases it does use O(n) memory, and at some point that becomes too much.

Processing an array with "workers" uses only O(nWorkers) memory.

V8 is pretty efficient, but if small changes (amount of code is similar) can use less memory, that's a good thing, right? Less memory trashing => less GC => faster, even apart from host constraints

wmertens avatar Sep 19 '19 07:09 wmertens

@wmertens oh right. I have seen that happening (OOM). I have used array-split for splitting a long array and processing it in chunks.

Maybe I could add it to the examples.

OlliV avatar Sep 19 '19 19:09 OlliV