p-map icon indicating copy to clipboard operation
p-map copied to clipboard

`pMapIterable` `preserveOrder` option

Open tgfisher4 opened this issue 8 months ago • 6 comments

Summary

Adds a preserveOrder option to pMapIterable, which indicates

Whether the output iterable should produce the results of the mapper on elements of the input iterable in the same order as the elements were produced. If false, mapper results will be produced in the order they are available, which may not match the order the mapper inputs were produced by the input iterable, but may improve throughput. Type: boolean
Default: true

Fixes #72. Fixes #76.

This implementation a bit more complicated than I expected, and I discovered several edge cases and race conditions along the way, so I encourage careful review and the suggestion/addition of further appropriate test cases.

Implementation considerations

In order to

  • (a) treat promises as an unordered pool for use with Promise.race and
  • (b) avoid an O(min(concurrency, backpressure)) promises.indexOf to know which promise to remove from promises,

the promise itself must return some identifying information. The promise's inputIndex is good for this purpose as it is a stable id that will not change when shuffling promises around. However, inputIndex alone is not enough to determine which member of promises has returned, so we also introduce extra bookkeeping promisesIndexFromInputIndex information. In order to correctly record promisesIndexFromInputIndex during mapNext, we "reserve" a spot in the promises array during trySpawn (from recursive trySpawn calls in particular, before we would've had a chance to promises.push(mapNext()), since the inner mapNext() expression executes additional trySpawns before we can perform the outer .push expression) by incrementing its length, so that we can use promises.length to determine promisesIndex.

Then, to avoid an O(min(concurrency, backpressure)) promises.splice to remove the resolved promise, we instead swap the resolved promise with promises[promises.length - 1]. We could also overwrite the resolved member of promises in-place, but it seemed slightly cleaner to do it this way so we could reuse this logic in the pMapSkip + preserveOrder: true case where we currently indexOf + splice. In order to make the aforementioned swap, though, we need to update our promisesIndexFromInputIndex ledger: however, we cannot know promises[promises.length - 1]'s input index without extra information, so we introduce an additional inputIndexFromPromisesIndex record.

Speaking of which, to unify the preserveOrder: false logic with the preserveOrder: true case, for the latter we still treat the promises array in the same pool-fashion, where the result of mapping any inputIndex might end up anywhere in promises, but bookkeep an additional outputIndex datum to use in conjunction with promisesIndexFromInputIndex to determine which promise is next in sequence (and so await and process). As mentioned earlier, this pool-based pattern also allows us to handle the pMapSkip case in an O(1) manner, compared to the existing indexOf + splice strategy.

Further, pMapSkip is now unconditionally handled within a mapNext (a helper that is roughly equivalent to the previous IIFE) promise via popPromise. This preserves the existing behavior of avoiding counting toward backpressure by occupying a position in promises, in case the main while loop does not await and popPromise this promise for some time (for example, perhaps preserveOrder: true and this promise is deep in the queue). To accomplish this unconditionality

  • (a) the main while loop now checks value === pMapSkip and continues before popPromise or trySpawn, since these were already called when mapNext observed the pMapSkip value
  • (b) since pMapSkip results in a popPromise, it can leave holes in the promisesIndexFromInputIndex ledger: to compensate, in the preserveOrder: true nextPromise, we skip outputIndexes where promisesIndexFromInputIndex[outputIndex] === undefined
  • (c) given the optimization of awaiting only when necessary (see last ¶), when trySpawn executes mapNext, this mapNext can end up synchronously observing a pMapSkip value and calling popPromise as a result, if there are no promises involved (the input iterable is sync and produces a non-promise which maps to a non-promise-wrapped pMapSkip). This can cause the promises[promisesIndex] assignment in trySpawn both to undo popPromise's work by writing a promise to promises that was supposed to be removed already, and further, to clobber another element of promises that was position in the promises array it no longer manages. To account for this, we perform the assignment if and only if promises[promisesIndex] === undefined, since if we have been popped, our promises position will already be taken by the trySpawn that follows the pMapSkip check (there is necessarily headroom in concurrency and backpressure since we have just decremented runningMappersCount and promises.length (the latter via popPromise) (even if we are the last element of the input iterable, trySpawn will still try to iterate further and receive a done result).

As a last piece of miscellany, when preserveOrder: false && (await nextPromise()).result.done === true, stopping the async iterable is no longer safe, since other promises may still be pending in the array. So, we continue in this case.

Finally, I made other small optimizations where I spotted an opportunity, like only awaiting when necessary and trySpawning before awaiting the input iterable's .next response, in case we can start another promise chugging, too.

tgfisher4 avatar May 24 '24 05:05 tgfisher4