async-sema
async-sema copied to clipboard
Some nice way of async iterating?
I'm trying to limit uploads to 10 simultaneously, and I'm doing it like this:
const uploadSema = new Sema(10)
for (const upload of uploads) {
await uploadSema.acquire()
uploadOne(upload).finally(() => uploadSema.release())
}
await uploadSema.drain()
It's reasonably nice, but I was wondering if there wasn't a way to make this nicer.
I moved the logic to this helper function
export const queuedWork = async (items, fn, workers = 10) => {
const sema = new Sema(workers)
let threw = null
for (const item of items) {
if (threw) break
await sema.acquire()
// eslint-disable-next-line promise/catch-or-return
Promise.resolve(fn(item))
.catch(err => {
threw = err
})
.finally(() => sema.release())
}
await sema.drain()
if (threw) throw threw
}
Is this a good way of going about it? Is there maybe a more elegant way?
(wrote these tests too)
test('queuedWork async', async () => {
const out = []
await queuedWork([1, 5, 7, 89, 2], async n => out.push(n), 2)
expect(out).toEqual([1, 5, 7, 89, 2])
})
test('queuedWork sync', async () => {
const out = []
await queuedWork([1, 5, 7, 89, 2], n => out.push(n), 2)
expect(out).toEqual([1, 5, 7, 89, 2])
})
test('queuedWork throws', async () => {
const out = []
await expect(
queuedWork(
[1, 2, 3, 4, 5],
async n => {
if (n === 2) throw new Error('meep')
else out.push(n)
},
2
)
).rejects.toThrow('meep')
expect(out).toEqual([1, 3])
})
test('queuedWork throws sync', async () => {
const out = []
await expect(
queuedWork(
[1, 2, 3, 4, 5],
n => {
if (n === 2) throw new Error('meep')
else out.push(n)
},
2
)
).rejects.toThrow('meep')
expect(out).toEqual([1])
})
What I have been doing is something like this:
await Promise.all(files.map(async (file) => {
await sema.acquire();
try {
await upload(file);
} finally {
sema.release();
}
}));
@wmertens Actually, for your particular problem, you should look at this module instead: https://github.com/sindresorhus/p-queue
p-queue has some queue-based operations, event-emitters, and start/pause functionality.
async-sema has its uses of course, for e.g. if we are concerned about rate limiting.
@OlliV That's nice too, with the difference that you create all the promises at once. So for <1000 items that's easier to reason about, but beyond that I'm worried about memory consumption.
@cardin hmmm p-queue seems to only work by adding functions to a queue, not by passing an array and having it be processed. So it doesn't help in this case, or am I mistaken?
@wmertens The example code that OlliV gave shouldn't run into memory issues, since even 10,000 pending promises shouldn't be too much of an issue if they're not allowed to start.
p-queue
has an .addAll() that accepts an array of functions. Like queue.add(() => uploadOne(upload))
. As for why functions instead of promises, the author's rationale is to have more control over when the queue begins -> functions only execute the async operation when called, unlike promises which is uncontrollable.
@cardin but in both cases it does use O(n) memory, and at some point that becomes too much.
Processing an array with "workers" uses only O(nWorkers) memory.
V8 is pretty efficient, but if small changes (amount of code is similar) can use less memory, that's a good thing, right? Less memory trashing => less GC => faster, even apart from host constraints
@wmertens oh right. I have seen that happening (OOM). I have used array-split for splitting a long array and processing it in chunks.
Maybe I could add it to the examples.