swimmer icon indicating copy to clipboard operation
swimmer copied to clipboard

TypeScript port

Open ssured opened this issue 7 years ago • 1 comments

Your lib is awesome! Because I use TypeScript, I ported the code to TypeScript; I'm not an expert but it works pretty nice. Maybe you want to support TypeScript?

// code based on JS version at https://github.com/tannerlinsley/swimmer/blob/5c4fceb83e233bbc8850fb55337526c07ec3a18a/src/index.js
type TThunk<T> = () => Promise<T>;

type TPoolConfig<T> = {
    concurrency: number;
    started: boolean;
    tasks: TThunk<T>[];
};

type TSettledCb = () => void;
type TErrorCb<T, E> = (error: E, task: TThunk<T>) => void;
type TSuccessCb<T> = (result: T, task: TThunk<T>) => void;

const defaultConfig: TPoolConfig<any> = {
    concurrency: 5,
    started: true,
    tasks: [],
};

const resolves = new WeakMap<TThunk<any>, Function>();
const rejects = new WeakMap<TThunk<any>, Function>();

export function createPool<T = any, E = any>(config: Partial<TPoolConfig<T>> = defaultConfig) {
    const { concurrency, started, tasks } = {
        ...defaultConfig,
        ...config,
    };

    let onSettles: TSettledCb[] = [];
    let onErrors: TErrorCb<T, E>[] = [];
    let onSuccesses: TSuccessCb<T>[] = [];

    let running: boolean = started;
    let active: TThunk<T>[] = [];
    let pending: TThunk<T>[] = tasks;
    let currentConcurrency = concurrency;

    const tick = () => {
        if (!running) {
            return;
        }
        if (!pending.length && !active.length) {
            onSettles.forEach(d => d());
            return;
        }
        while (active.length < currentConcurrency && pending.length) {
            const nextFn = pending.shift()!;
            active.push(nextFn);
            /* eslint-disable no-loop-func */
            (async () => {
                let success = false;
                let res: any;
                let error: any;
                try {
                    res = await nextFn();
                    success = true;
                } catch (e) {
                    error = e;
                }
                active = active.filter(d => d !== nextFn);
                if (success) {
                    resolves.get(nextFn)!(res);
                    onSuccesses.forEach(d => d(res, nextFn));
                } else {
                    rejects.get(nextFn)!(error);
                    onErrors.forEach(d => d(error, nextFn));
                }
                tick();
            })();
            /* eslint-enable no-loop-func */
        }
    };

    const api = {
        add: (fn: TThunk<T>, { priority }: { priority?: boolean } = {}) =>
            new Promise((resolve, reject) => {
                if (priority) {
                    pending.unshift(fn);
                } else {
                    pending.push(fn);
                }
                resolves.set(fn, resolve);
                rejects.set(fn, reject);
                tick();
            }),
        throttle: (n: number) => {
            currentConcurrency = n;
        },
        onSettled: (cb: TSettledCb) => {
            onSettles.push(cb);
            return () => {
                onSettles = onSettles.filter(d => d !== cb);
            };
        },
        onError: (cb: TErrorCb<T, E>) => {
            onErrors.push(cb);
            return () => {
                onErrors = onErrors.filter(d => d !== cb);
            };
        },
        onSuccess: (cb: TSuccessCb<T>) => {
            onSuccesses.push(cb);
            return () => {
                onSuccesses = onSuccesses.filter(d => d !== cb);
            };
        },
        stop: () => {
            running = false;
        },
        start: () => {
            running = true;
            tick();
        },
        clear: () => {
            pending = [];
        },
        getActive: () => active,
        getPending: () => pending,
        getAll: () => [...active, ...pending],
        isRunning: () => running,
        isSettled: () => !running && !active.length && !pending.length,
    };

    return api;
}

export function poolAll<T>(tasks: TThunk<T>[], concurrency: number) {
    return new Promise((resolve, reject) => {
        const pool = createPool({
            concurrency,
        });
        const results: T[] = [];
        pool.onSettled(() => {
            resolve(results);
        });
        pool.onError(err => {
            reject(err);
        });
        tasks.forEach((task, i) => {
            pool.add(async () => {
                const res = await task();
                results[i] = res;
                return res;
            });
        });
        pool.start();
    });
}

ssured avatar Sep 04 '18 15:09 ssured

Oh, one thing; I added a WeakMap to replace the .resolve and .reject props you added on the thunk. It needs a compatible browser (evergreen)

ssured avatar Sep 04 '18 15:09 ssured