p-prefetch or p-iterable?
I have created a module that seems to fit well here, as it was initially based off of p-map. The module is itself an async/sync iterable and it takes the same inputs as pMap() with an additional maxUnread option.
If you configure it with concurrency 4 and maxUnread 8, it will spin up 4 runners, push their mapper result or wrapped exception into a queue, then start another mapper. When the number of mappers running + queue depth == maxUnread then the runners will exit.
When the caller then iterates an item out of the unread result queue it will start another runner, and so on.
This essentially allows the consumer to "prefetch" results of promises with back pressure. An example would be when you're reading 10,000 files from, say, S3, but you don't want to fill up your disk or spend all your time fetching files that you will never get to: the prefetcher can be configured to always have, say, no more than 20 files ready and can use up to 20 mappers to fetch more when needed.
There is some overlap with p-queue in that the starting/stopping of runners is similar to what p-queue is doing to adjust concurrency when the queue drains / hits empty / refills. I think it could be possible to implement this iterable with back pressure using p-queue under the hood with the event function passed to p-queue being a function that writes into the unread queue and awaits a Promise that indicates that an item has been read from the queue when the queue is full (thus preventing further items from being iterated if maxUnread has been hit).
Is there interest in including another module here with these capabilities? The module is currently written in TypeScript (500 lines) and has 1,000 lines of TS Jest tests. The TypeScript can be easily converted to plain JS and the tests have been easy to convert back and forth from Jest to Ava, so that should not be a problem if it's desired to use JS/Ava.
Related to https://github.com/sindresorhus/p-map/issues/20?
@Richienb - No, sindresorhus/p-map#20 was for allowing the input to be an AsyncIterable. The proposal in this issue is for a new version of pMap that is itself exposing an AsyncIterable interface to allow the consumer to read items as they are completed (and to stop resolving items if the max # of items waiting to be read hits an optional limit). As a side note, I have a PR open for p-map#20 :)
So you mean when input is an async iterable?
So you mean when
inputis an async iterable?
Here is an abbreviation of the interface to perhaps make it clearer what I'm proposing:
class pMapIterable<Element, NewElement> implements AsyncIterable<NewElement> {
constructor(iterable: AsyncIterable<Element> | Iterable<Element>, mapper: Mapper<Element, NewElement>, { concurrency, stopOnMapperError, maxUnread }?: Options);
/**
* Creates an iterator instance
* Note: the iterators for this class are destructive so only one can be created per object lifetime.
*/
[Symbol.asyncIterator](): AsyncIterator<NewElement>;
/**
* Used by the iterator returned from [Symbol.asyncIterator]
* Called every time an item is needed
* Note: there can be multiple pending calls to `.next()` that will all resolve to `result.done === true` if the end of the source iterator is reached and there are not enough items to give to all of the pending `.next()` calls.
* @returns Iterator result
*/
next(): Promise<IteratorResult<NewElement>>;
}
Key differences with pMap and pQueue:
- How mapped items are returned to the caller:
- pMap: the caller must wait for all input items to be mapped before gaining access to all of them via a returned array - this stalls pipelines as they must wait for all items to be completed before the next stage can start
- pMapIterable: the caller is returned an
AsyncIterableinterface that they can begin looping through immediately upon class construction. Initially there will be no items available so the mapped item iteration will wait for a mapped item to be ready. When a mapped item is ready it will beyielded through theAsyncIterableinterface and the consumer will see that item.
- Peak Approximate Memory usage:
- pMap: When
inputis anAsyncIterablethat does not actually store all values in memory then it is possible to limit memory usage to the total memory needed to store all the mapped items in an array. If each mapped item and input item is 1 MB and there are 100 of them, memory usage would peak at[item count] * 1 MB = 100 * 1 MB = 100 MB. - pMapIterable: When
inputis anAsyncIterableandmaxUnreadis set to, say, 4, and concurrency 2, with each mapped item and input item being 1 MB, 100 input items total, then peak memory usage would be[maxUnread] * 1 MB = 4 * 1 MB = 4 MB
- pMap: When
- Number of Runners / Concurrency
- pMap: Concurrency is static - a specified number of
runnersare started on startup and that number stays static until the input iterable runs out of items. - pQueue: Concurrency is dynamic - as the queue runs out of items the runners are stopped until more items arrive, at which point they are started again.
- pMapIterable: Concurrency is dynamic - as the queue of unread items approaches
maxUnreadthe runners are stopped so that there can never be enough runners to actually overfill the queue of unread items. When an item is read from a full unread queue by the consumer then another runner will be started to fill that slot.
- pMap: Concurrency is static - a specified number of
I hope that helps clear up the differences.
@sindresorhus do you have a preference on creating this new library as part of the p-suite or releasing it outside of the p-suite but named in a way that indicates it is related (but not conflicting)?
Not sure why there isn't a PR in p-map already I'll quickly make one, I guess
@Richienb - It's crazy difficult... unless I made a major mistake.
I think the PR above is similar to my first implementation: it allows iterating the results as they come in, but it lacks back pressure, so the producer will produce far more items and take resources from the consumer, so ultimately the producer can (in some cases) process all of the input (think hundreds of millions of items) and take up memory that does not need to be used.
So I don't think I would merge the PR above... it seems useful at first glance but it has drawbacks that will not be readily apparent to all consumers.
it lacks back pressure
What should be the limit of items in back pressure before it continues iterating?
it lacks back pressure
What should be the limit of items in back pressure before it continues iterating?
I made mine configurable but defaulted it to 8 I think.
There were a few edge cases around how to get it to stop and start again with exactly the amount of unread items waiting to be iterated.
@Richienb - I'm waiting on an update to our npmjs scope to publish the package, but take a look at this and you'll see what I was talking about... I think it's an interesting and useful / difficult to implement mix of what p-map and p-queue do.
https://github.com/shutterstock/p-map-iterable
1000 lines? 💀 Could you give me a rundown of how you did it?
Here's what the me from March did in the pr:
- All values start in the "pending" queue
- Values are moved to the "waiting" queue as spaces open for more invokations (limited by "waiting" queue size < concurrency and "send" queue size + "waiting" queue size < backpressure)
- Values are moved to the "send" queue when they are done
- Values stay in the "send" queue until the earliest possible value from "pending" has finished processing
Oh... interesting... what did we end up with here?
Is the new pMapIterable something like the one I made where you can iterate the results as they become available?
Are the test cases approximately the same or completely different?
Is the new
pMapIterablesomething like the one I made where you can iterate the results as they become available?
Yes indeed.
At first glance, it looks like the main differences between @shutterstock/p-map-iterable vs pMapIterable are:
-
maxUnreadvsbackpressurenaming - Exposes a mutable queue vs accepts an async iterable
-
stopOnMapperErroroption vs throws - Skipping handled by consumer vs skipping handled by producer