manifold icon indicating copy to clipboard operation
manifold copied to clipboard

Thread pool for consumption of a stream (aka work queues)

Open hansenpansen opened this issue 8 years ago • 1 comments

Hello,

thank you very, very much for this incredible project and the effort you put into it.

I am wondering whether I understood the execution model correctly. I am trying to build a work queue, were data is put irregularly onto a stream via manifold.stream/put!. My naïve idea was to manifold.stream/map the (blocking) worker function with multiple workers (a manifold.executor/fixed-thread-executor pool assigned via manifold.stream/onto) and consume the results with manifold.stream/consume. But this resulted in a serial processing of the input. I found #126, but am unsure if that is the proper way.

At Slack, @dm3 was super-helpful and patiently went through this with me, and came up with the following proposal:

(defn fork
  "Takes an `src` stream and returns `n` forked streams which will receive
  messages from `src` on the first-come, first-served basis.

  Once the `src` stream is exhausted, the forked streams will be closed as
  well.

  Takes a map of options as the third argument:

    * `pool` - the executor where the execution will happen. Uses the
       Manifold `execute-pool` by default.
    * `generator` - function which produces the forked streams. Unbuffered
      `stream` by default."
  ([src n] (fork src n {:pool (ex/execute-pool), :generator s/stream}))
  ([src n {:keys [pool generator]}]
   (let [src' (s/stream)
         dsts (take n (repeatedly generator))
         ^java.util.concurrent.BlockingQueue ready
         (doto (java.util.concurrent.ArrayBlockingQueue. n)
           (.addAll dsts))

         free-up! #(.offer ready %)
         next! #(.take ready)
         send! #(-> (s/put! %1 %2)
                    (d/chain
                      (fn [result]
                        (if result
                          (free-up! %1)
                          (s/close! %1)))))]

     ;; in case anyone else wants to consume `src`
     (s/connect src src')

     (d/loop [dsts (.take ready)]
             (-> (s/take! src' ::none)
                 (d/chain
                   (fn [result]
                     (if (= result ::none)
                       (doseq [d dsts]
                         (s/close! d))
                       (do (d/future-with pool
                                          (send! dsts result))
                           (d/chain (next!)
                                    #(d/recur %))))))))

     (map s/source-only dsts))))

What is your valued opinion on realizing concurrent producer/consumer queues with manifold?

hansenpansen avatar Jun 08 '17 15:06 hansenpansen

@hansenpansen Just noticed that the code above is incorrect. dsts binding in the d/loop and in (send! dsts result) should be changed to dst.

dm3 avatar Jul 15 '17 17:07 dm3