missionary icon indicating copy to clipboard operation
missionary copied to clipboard

parallel processing

Open leonoel opened this issue 3 years ago • 2 comments

Goal

Find the right pattern to parallelize processing on some part of a flow pipeline.

Solution 1 : emulate channels

Using rdv and dfv, we can build a channel-like primitive and use ap + ?= to run a single producer concurrently with an arbitrary amount of consumers. The producer feeds values in the channel, close it on termination and emits nothing. Each consumer reads values from the channel, passes them through a user-provided pipeline and emits resulting values.

cf POC

Problem : the pattern may be too complex to be implemented manually in user space.

Solution 2 : ?= + sem

via Panel on slack

(let [sem (m/sem 2)]
  (m/ap
   (let [batch (m/?= (->> (fetch-ids)
                          (m/eduction (partition-all 20))))]
     (m/holding sem (->> (fetch-projects batch)
                         (m/reduce conj [])
                         m/?)))))

Problem : while the semaphore effectively ensures no more than 2 fetch-projects instances run concurrently, the memory footprint is unbounded because ?= pulls input and spawns new branches as soon as possible. If fetch-ids is infinite and able to produce batches faster than the maximal processing thoughput (in this case, 2 divided by the average delay of fetch-projects), then the semaphore queue grows steadily.

Solution 3 : fix ?=

Add an optional argument to ?= to specify an upper bound on the number of concurrent branches. When this number is reached, values stop being pulled from input and resume again when a branch terminates.

(m/ap
  (let [batch (m/?= 2 (->> (fetch-ids)
                        (m/eduction (partition-all 20))))]
    (->> (fetch-projects batch)
      (m/reduce conj [])
      m/?)))

Prior art : ReactiveX - Flowable/flatMap

leonoel avatar Jan 19 '22 14:01 leonoel

I see zero downside to solution 3 so far, it seems to be the way. Moreover, assuming a mechanism to limit concurrency, ?> and ?= actually unify because they're special cases for bounds 1 and ##Inf respectively.

In fact it's unclear to me if there's any valid use case for unbounded concurrent forking. If the input size has a known upper bound then you can specify it, otherwise you don't want to pull values indefinitely so you have to choose a bound anyway.

leonoel avatar Jan 24 '22 12:01 leonoel

So here's a plan :

  • ?> now accepts an optional argument before the flow, the concurrency bound. Must be a positive integer, the default value is 1, so the previous behavior is preserved.
  • ?= desugars to ?> with an infinite bound. It is marked as deprecated and will eventually be removed, because it's a dangerous pattern that can blow up memory if not carefully used.
  • amb= desugars to ?> with a bound equal to the form count.
  • amb> is renamed to amb (both flavors of amb rely on ?> so the angle bracket has no meaning here).

The bad news is, we have a naming problem again. ReactiveX tricked me into thinking there was 3 forking operators, and I thought it was a good idea to steal their naming scheme - cf #44 . It turns out there's only 2, one giving priority to the parent and the other giving priority to the children.

leonoel avatar Jan 25 '22 09:01 leonoel

Released in b.27

leonoel avatar Feb 13 '23 13:02 leonoel