missionary
missionary copied to clipboard
parallel processing
Goal
Find the right pattern to parallelize processing on some part of a flow pipeline.
Solution 1 : emulate channels
Using rdv
and dfv
, we can build a channel-like primitive and use ap
+ ?=
to run a single producer concurrently with an arbitrary amount of consumers. The producer feeds values in the channel, close it on termination and emits nothing. Each consumer reads values from the channel, passes them through a user-provided pipeline and emits resulting values.
cf POC
Problem : the pattern may be too complex to be implemented manually in user space.
Solution 2 : ?=
+ sem
via Panel on slack
(let [sem (m/sem 2)]
(m/ap
(let [batch (m/?= (->> (fetch-ids)
(m/eduction (partition-all 20))))]
(m/holding sem (->> (fetch-projects batch)
(m/reduce conj [])
m/?)))))
Problem : while the semaphore effectively ensures no more than 2 fetch-projects
instances run concurrently, the memory footprint is unbounded because ?=
pulls input and spawns new branches as soon as possible. If fetch-ids
is infinite and able to produce batches faster than the maximal processing thoughput (in this case, 2 divided by the average delay of fetch-projects
), then the semaphore queue grows steadily.
Solution 3 : fix ?=
Add an optional argument to ?=
to specify an upper bound on the number of concurrent branches. When this number is reached, values stop being pulled from input and resume again when a branch terminates.
(m/ap
(let [batch (m/?= 2 (->> (fetch-ids)
(m/eduction (partition-all 20))))]
(->> (fetch-projects batch)
(m/reduce conj [])
m/?)))
Prior art : ReactiveX - Flowable/flatMap
I see zero downside to solution 3 so far, it seems to be the way. Moreover, assuming a mechanism to limit concurrency, ?>
and ?=
actually unify because they're special cases for bounds 1
and ##Inf
respectively.
In fact it's unclear to me if there's any valid use case for unbounded concurrent forking. If the input size has a known upper bound then you can specify it, otherwise you don't want to pull values indefinitely so you have to choose a bound anyway.
So here's a plan :
-
?>
now accepts an optional argument before the flow, the concurrency bound. Must be a positive integer, the default value is1
, so the previous behavior is preserved. -
?=
desugars to?>
with an infinite bound. It is marked as deprecated and will eventually be removed, because it's a dangerous pattern that can blow up memory if not carefully used. -
amb=
desugars to?>
with a bound equal to the form count. -
amb>
is renamed toamb
(both flavors ofamb
rely on?>
so the angle bracket has no meaning here).
The bad news is, we have a naming problem again. ReactiveX tricked me into thinking there was 3 forking operators, and I thought it was a good idea to steal their naming scheme - cf #44 . It turns out there's only 2, one giving priority to the parent and the other giving priority to the children.
Released in b.27