p-map
p-map copied to clipboard
`pMapIterable` `preserveOrder` option
Summary
Adds a preserveOrder
option to pMapIterable
, which indicates
Whether the output iterable should produce the results of the
mapper
on elements of theinput
iterable in the same order as the elements were produced. Iffalse
,mapper
results will be produced in the order they are available, which may not match the order themapper
inputs were produced by theinput
iterable, but may improve throughput. Type:boolean
Default:true
Fixes #72. Fixes #76.
This implementation a bit more complicated than I expected, and I discovered several edge cases and race conditions along the way, so I encourage careful review and the suggestion/addition of further appropriate test cases.
Implementation considerations
In order to
- (a) treat
promises
as an unordered pool for use withPromise.race
and - (b) avoid an
O(min(concurrency, backpressure))
promises.indexOf
to know whichpromise
to remove frompromises
,
the promise itself must return some identifying information. The promise's inputIndex
is good for this purpose as it is a stable id that will not change when shuffling promises
around. However, inputIndex
alone is not enough to determine which member of promises
has returned, so we also introduce extra bookkeeping promisesIndexFromInputIndex
information. In order to correctly record promisesIndexFromInputIndex
during mapNext
, we "reserve" a spot in the promises
array during trySpawn
(from recursive trySpawn
calls in particular, before we would've had a chance to promises.push(mapNext())
, since the inner mapNext()
expression executes additional trySpawn
s before we can perform the outer .push
expression) by incrementing its length, so that we can use promises.length
to determine promisesIndex
.
Then, to avoid an O(min(concurrency, backpressure))
promises.splice
to remove the resolved promise, we instead swap the resolved promise with promises[promises.length - 1]
. We could also overwrite the resolved member of promises
in-place, but it seemed slightly cleaner to do it this way so we could reuse this logic in the pMapSkip + preserveOrder: true
case where we currently indexOf
+ splice
. In order to make the aforementioned swap, though, we need to update our promisesIndexFromInputIndex
ledger: however, we cannot know promises[promises.length - 1]
's input index without extra information, so we introduce an additional inputIndexFromPromisesIndex
record.
Speaking of which, to unify the preserveOrder: false
logic with the preserveOrder: true
case, for the latter we still treat the promises
array in the same pool-fashion, where the result of mapping any inputIndex
might end up anywhere in promises
, but bookkeep an additional outputIndex
datum to use in conjunction with promisesIndexFromInputIndex
to determine which promise is next in sequence (and so await
and process). As mentioned earlier, this pool-based pattern also allows us to handle the pMapSkip
case in an O(1)
manner, compared to the existing indexOf
+ splice
strategy.
Further, pMapSkip
is now unconditionally handled within a mapNext
(a helper that is roughly equivalent to the previous IIFE) promise via popPromise
. This preserves the existing behavior of avoiding counting toward backpressure
by occupying a position in promises
, in case the main while
loop does not await
and popPromise
this promise for some time (for example, perhaps preserveOrder: true
and this promise is deep in the queue). To accomplish this unconditionality
- (a) the main
while
loop now checksvalue === pMapSkip
andcontinue
s beforepopPromise
ortrySpawn
, since these were already called whenmapNext
observed thepMapSkip
value - (b) since
pMapSkip
results in apopPromise
, it can leave holes in thepromisesIndexFromInputIndex
ledger: to compensate, in thepreserveOrder: true
nextPromise
, we skipoutputIndex
es wherepromisesIndexFromInputIndex[outputIndex] === undefined
- (c) given the optimization of
await
ing only when necessary (see last ¶), whentrySpawn
executesmapNext
, thismapNext
can end up synchronously observing apMapSkip
value and callingpopPromise
as a result, if there are no promises involved (the input iterable is sync and produces a non-promise which maps to a non-promise-wrappedpMapSkip
). This can cause thepromises[promisesIndex]
assignment intrySpawn
both to undopopPromise
's work by writing a promise topromises
that was supposed to be removed already, and further, to clobber another element ofpromises
that was position in thepromises
array it no longer manages. To account for this, we perform the assignment if and only ifpromises[promisesIndex] === undefined
, since if we have been popped, ourpromises
position will already be taken by thetrySpawn
that follows thepMapSkip
check (there is necessarily headroom inconcurrency
andbackpressure
since we have just decrementedrunningMappersCount
andpromises.length
(the latter viapopPromise
) (even if we are the last element of the input iterable,trySpawn
will still try to iterate further and receive adone
result).
As a last piece of miscellany, when preserveOrder: false && (await nextPromise()).result.done === true
, stopping the async iterable is no longer safe, since other promises
may still be pending in the array. So, we continue
in this case.
Finally, I made other small optimizations where I spotted an opportunity, like only await
ing when necessary and trySpawn
ing before await
ing the input iterable
's .next
response, in case we can start another promise chugging, too.