manifold
manifold copied to clipboard
Thread pool for consumption of a stream (aka work queues)
Hello,
thank you very, very much for this incredible project and the effort you put into it.
I am wondering whether I understood the execution model correctly. I am trying to build a work queue, were data is put irregularly onto a stream via manifold.stream/put!. My naïve idea was to manifold.stream/map the (blocking) worker function with multiple workers (a manifold.executor/fixed-thread-executor pool assigned via manifold.stream/onto) and consume the results with manifold.stream/consume. But this resulted in a serial processing of the input. I found #126, but am unsure if that is the proper way.
At Slack, @dm3 was super-helpful and patiently went through this with me, and came up with the following proposal:
(defn fork
"Takes an `src` stream and returns `n` forked streams which will receive
messages from `src` on the first-come, first-served basis.
Once the `src` stream is exhausted, the forked streams will be closed as
well.
Takes a map of options as the third argument:
* `pool` - the executor where the execution will happen. Uses the
Manifold `execute-pool` by default.
* `generator` - function which produces the forked streams. Unbuffered
`stream` by default."
([src n] (fork src n {:pool (ex/execute-pool), :generator s/stream}))
([src n {:keys [pool generator]}]
(let [src' (s/stream)
dsts (take n (repeatedly generator))
^java.util.concurrent.BlockingQueue ready
(doto (java.util.concurrent.ArrayBlockingQueue. n)
(.addAll dsts))
free-up! #(.offer ready %)
next! #(.take ready)
send! #(-> (s/put! %1 %2)
(d/chain
(fn [result]
(if result
(free-up! %1)
(s/close! %1)))))]
;; in case anyone else wants to consume `src`
(s/connect src src')
(d/loop [dsts (.take ready)]
(-> (s/take! src' ::none)
(d/chain
(fn [result]
(if (= result ::none)
(doseq [d dsts]
(s/close! d))
(do (d/future-with pool
(send! dsts result))
(d/chain (next!)
#(d/recur %))))))))
(map s/source-only dsts))))
What is your valued opinion on realizing concurrent producer/consumer queues with manifold?
@hansenpansen Just noticed that the code above is incorrect. dsts binding in the d/loop and in (send! dsts result) should be changed to dst.