swimmer
swimmer copied to clipboard
TypeScript port
Your lib is awesome! Because I use TypeScript, I ported the code to TypeScript; I'm not an expert but it works pretty nice. Maybe you want to support TypeScript?
// code based on JS version at https://github.com/tannerlinsley/swimmer/blob/5c4fceb83e233bbc8850fb55337526c07ec3a18a/src/index.js
type TThunk<T> = () => Promise<T>;
type TPoolConfig<T> = {
concurrency: number;
started: boolean;
tasks: TThunk<T>[];
};
type TSettledCb = () => void;
type TErrorCb<T, E> = (error: E, task: TThunk<T>) => void;
type TSuccessCb<T> = (result: T, task: TThunk<T>) => void;
const defaultConfig: TPoolConfig<any> = {
concurrency: 5,
started: true,
tasks: [],
};
const resolves = new WeakMap<TThunk<any>, Function>();
const rejects = new WeakMap<TThunk<any>, Function>();
export function createPool<T = any, E = any>(config: Partial<TPoolConfig<T>> = defaultConfig) {
const { concurrency, started, tasks } = {
...defaultConfig,
...config,
};
let onSettles: TSettledCb[] = [];
let onErrors: TErrorCb<T, E>[] = [];
let onSuccesses: TSuccessCb<T>[] = [];
let running: boolean = started;
let active: TThunk<T>[] = [];
let pending: TThunk<T>[] = tasks;
let currentConcurrency = concurrency;
const tick = () => {
if (!running) {
return;
}
if (!pending.length && !active.length) {
onSettles.forEach(d => d());
return;
}
while (active.length < currentConcurrency && pending.length) {
const nextFn = pending.shift()!;
active.push(nextFn);
/* eslint-disable no-loop-func */
(async () => {
let success = false;
let res: any;
let error: any;
try {
res = await nextFn();
success = true;
} catch (e) {
error = e;
}
active = active.filter(d => d !== nextFn);
if (success) {
resolves.get(nextFn)!(res);
onSuccesses.forEach(d => d(res, nextFn));
} else {
rejects.get(nextFn)!(error);
onErrors.forEach(d => d(error, nextFn));
}
tick();
})();
/* eslint-enable no-loop-func */
}
};
const api = {
add: (fn: TThunk<T>, { priority }: { priority?: boolean } = {}) =>
new Promise((resolve, reject) => {
if (priority) {
pending.unshift(fn);
} else {
pending.push(fn);
}
resolves.set(fn, resolve);
rejects.set(fn, reject);
tick();
}),
throttle: (n: number) => {
currentConcurrency = n;
},
onSettled: (cb: TSettledCb) => {
onSettles.push(cb);
return () => {
onSettles = onSettles.filter(d => d !== cb);
};
},
onError: (cb: TErrorCb<T, E>) => {
onErrors.push(cb);
return () => {
onErrors = onErrors.filter(d => d !== cb);
};
},
onSuccess: (cb: TSuccessCb<T>) => {
onSuccesses.push(cb);
return () => {
onSuccesses = onSuccesses.filter(d => d !== cb);
};
},
stop: () => {
running = false;
},
start: () => {
running = true;
tick();
},
clear: () => {
pending = [];
},
getActive: () => active,
getPending: () => pending,
getAll: () => [...active, ...pending],
isRunning: () => running,
isSettled: () => !running && !active.length && !pending.length,
};
return api;
}
export function poolAll<T>(tasks: TThunk<T>[], concurrency: number) {
return new Promise((resolve, reject) => {
const pool = createPool({
concurrency,
});
const results: T[] = [];
pool.onSettled(() => {
resolve(results);
});
pool.onError(err => {
reject(err);
});
tasks.forEach((task, i) => {
pool.add(async () => {
const res = await task();
results[i] = res;
return res;
});
});
pool.start();
});
}
Oh, one thing; I added a WeakMap to replace the .resolve and .reject props you added on the thunk. It needs a compatible browser (evergreen)