dealing with blocking iterators
This is highly specific to my codebase so feel free to decline any help.
I have a pipeline implementation that weaves a bunch of java.util.Iterator
s together. It's just a bunch of functions composed together, where each one takes the previous Iterator + some options and returns a new Iterator. So something like
(-> (read-from-db nil {:connection "..."}) (transform {:fn some-fn}) (buffer {:size 32}) (write-to-db {:connection "..."}))
is a composition of a bunch of functions that each return a new Iterator. To run it you need to walk (drain) the final Iterator.
This is purely sequential, so there's a lot of stalling happening, i.e. the next read is waiting for the previous write to finish. I'd like to inject a prefetch
function into the pipeline, just before the write-to-db
call so that the next value in the pipeline is eagerly pulled from upstream.
Here's an implementation of that strategy, previously named fork, without missionary:
(defn fork [in opts]
(let [n (:n opts (.availableProcessors (Runtime/getRuntime)))
tq (u/q n), q (u/q n), stop (volatile! false)
_th (u/thread nil "pg-fork-coordinator"
(loop []
(when-not @stop
(u/take tq)
(if (it/next? in)
(do (u/put q [:ok (it/next in)]) (recur))
(vreset! stop true))))
(catch Throwable e (u/put q [:ex e]))))
ini (volatile! nil)]
(reify Iterator
(hasNext [_]
(try (when (nil? @ini) (vreset! ini true) (dotimes [_ n] (u/put tq ::go)))
;; TODO possible race while not stopped and q is empty but tq is not and we're waiting for a value
(boolean (or (not @stop) (u/has? q)))
(catch Throwable e (vreset! stop true) (throw e))))
(next [_] (try (let [[st vl] (u/take q)] (if (= :ok st) (do (u/put tq ::go) vl) (throw vl)))
(catch Throwable e (vreset! stop true) (throw e)))))))
The other functions in that defition are for the alias it
(defn next [it] (.next ^Iterator it))
(defn next? [it] (.hasNext ^Iterator it))
and from u
(defn q ([] (LinkedTransferQueue.)) ([n] (LinkedBlockingQueue. (int n))))
(defn put [q v] (if (instance? TransferQueue q) (.transfer ^TransferQueue q v) (.put ^BlockingQueue q v)) v)
(defn take [q] (.take ^BlockingQueue q))
(defn has? [q] (not (.isEmpty ^java.util.Collection q)))
(defmacro with {:style/indent 1} [[s v] & body] `(let [rt# ~v ~s rt#] ~@body rt#))
(defmacro thread {:style/indent 2} [^ThreadGroup tg nm & code]
`(with [t# (Thread. (or ~tg (.getThreadGroup (Thread/currentThread))) (fn [] ~@code) ~nm)] (.start t#)))
It basically forks a thread and uses 2 queues, 1 "token queue" that unblocks the loop to prefetch another value and 1 result queue where the computed values get put. There's also 2 volatiles to coordinate the initialization and stopping. As it stands it's pretty tricky code that took some time to get right.
I wonder if a missionary solution would look cleaner? Do you have an idea of how you would go about implementing the function? I haven't written any missionary code yet so I'm having a hard time deciding which primitives would be useful. I guess a similar solution could be built with a mailbox and calculating in blk
I had to fix some edge cases, the current version:
(defn fork [in opts]
(let [n (opts :n 1), f (opts :fn identity), tq (u/q n), q (u/q n), cl! #(run! future-cancel q)
_th (u/thread nil "pg-fork-coordinator"
(try (loop []
(u/take tq)
(if (it/next? in)
(let [v (it/next in)] (u/put q [:ok (future (f v))]) (recur))
(u/put q [:no])))
(catch Throwable e (cl!) (u/put q [:ex e]))))
ini (volatile! nil), cur (volatile! :none)]
(reify Iterator
(hasNext [_]
;; (print (if (future? @cur) @@cur "nil")) (print " ") (pr (vec tq)) (print " ") (prn (mapv first q))
(when (nil? @ini) (vreset! ini true) (dotimes [_ n] (u/put tq :go)))
(if (nil? @cur)
(let [[st vl] (u/take q)]
(case st
:ok (do (u/put tq :go) (vreset! cur vl) true)
:no (do (vreset! cur nil) false)
:ex (throw vl)))))
(next [_] (let [v @cur] (if (nil? v) (it/bad!) (try @v (catch Throwable e (cl!) (throw e)))))))))
I need to spawn a separate thread to do the processing (that's the whole point), which brings me to sending messages between the threads, erlang-style. I don't see how to get rid of that though. If I could swap the futures out for tasks that would be dependent on one another I could get rid of the cl!
function, don't know how to model that though. Apart from that little piece I have no clue how to rewrite the rest into missionary.
If you need to interop with a blocking java.util.Iterator
you can define a flow consuming its values, like so :
(defn iterator-consumer [^java.util.Iterator iterator]
(while (m/?? (m/enumerate (when (m/? (m/via m/blk (.hasNext iterator))) [false true]))))
(.next iterator)))
Then you can use missionary
's operators to build your pipeline, each stage defines its own logical process so prefetching becomes a non-issue.
Turning a flow into a blocking iterator is a bit trickier, here's a possible implementation :
(deftype FlowIterator [^:unsynchronized-mutable iterator
^:unsynchronized-mutable pending?]
(invoke [this]
(set! iterator
(iterator (partial this false)
(partial this true)))
(invoke [this done?]
(locking this
(set! pending? false)
(when done? (set! iterator nil))
(.notify this)))
(hasNext [this]
(locking this
(while pending?
(try (.wait this)
(catch InterruptedException _
(some? iterator)))
(next [this]
(locking this
(set! pending? true)
(defn flow->iterable [flow]
(reify Iterable
(iterator [_]
((->FlowIterator flow true)))))
It took me a day to understand that while loop :) This forking with ??
takes a while to internalize. I have no clue what's happening in the second snippet since I don't understand what (iterator (partial this false) (partial this true))
and @iterator
do. If I'm reading this correctly iterator
is bound to a flow, so I would need to look up their IFn
and IDeref
implementations respectively.
each stage defines its own logical process so prefetching becomes a non-issue.
What do you mean by that? Right now with the fork
function I posted above I have complete control over how many pages reside in memory, each fork adds max 1 page. Since some of my jobs are handling large amounts of data in bulk I need to really understand the upper bound of each pipeline memory-wise. Since missionary doesn't give out anything willy-nilly to some thread pool I understand I can retain that same level of control, but not sure how would I go about implementing it. I wanted to try and define the function with missionary, which as you noted means reading from a possibly blocking java Iterator and returning one in the end.
Thank you for your reply! I am closing this since this isn't an issue per se.
TBH I'm not sure it's the best way to write that. It could be a case where an amb
operator would help, e.g :
(defmacro amb [& forms]
`(case (m/?? (m/enumerate (range ~(count forms))))
~@(interleave (range) forms)))
(defn iterator-consumer [^java.util.Iterator iterator]
(loop []
(if (m/? (m/via m/blk (.hasNext iterator)))
(amb (.next iterator) (recur))
I have no clue what's happening in the second snippet since I don't understand what (iterator (partial this false) (partial this true)) and @iterator do. If I'm reading this correctly iterator is bound to a flow, so I would need to look up their IFn and IDeref implementations respectively.
That's fine, this code relies on the low-level details of the flow protocol, you don't have to understand how it works unless you're writing a library. I'm still considering adding a flow->iterable
operator to missionary.core
, so if you find it useful feel free to elaborate.
What do you mean by that? Right now with the fork function I posted above I have complete control over how many pages reside in memory, each fork adds max 1 page. Since some of my jobs are handling large amounts of data in bulk I need to really understand the upper bound of each pipeline memory-wise. Since missionary doesn't give out anything willy-nilly to some thread pool I understand I can retain that same level of control, but not sure how would I go about implementing it. I wanted to try and define the function with missionary, which as you noted means reading from a possibly blocking java Iterator and returning one in the end.
With blocking iterators, each thread is associated to a pipeline stage, you can add more stages with fork, this will increase parallelism and also memory footprint because stage synchronization requires to buffer at least 1 item, as you said. With missionary, each flow transformation creates a new pipeline stage, stages are decoupled from threads but the parallelism/buffering tradeoff is pretty much the same.
After understanding your example I would have probably come up with
(m/ap (loop []
(when (m/? (m/via m/blk (.hasNext it)))
(if (m/?? (m/enumerate [true false]))
(.next it)
With that written I see a direct resemblance with unix forking and would have come up with:
(defmacro fork [] `(m/?? (m/enumerate [true false])))
(m/ap (loop []
(when (m/? (m/via m/blk (.hasNext it)))
(if (m/fork) (.next it) (recur)))))
Since lisp has macros the if
can be nicely hidden, leading to:
(defmacro fork [a b] `(if (m/?? (m/enumerate [true false])) ~a ~b))
(m/ap (loop []
(when (m/? (m/via m/blk (.hasNext it)))
(m/fork (.next it) (recur)))))
At this point I can see amb
as a generalization of the unix fork.
Right, and the other way round. BTW to fully emulate unix fork, ??
should be replaced by ?=
to allow both branches to run concurrently.
That's fine, this code relies on the low-level details of the flow protocol, you don't have to understand how it works unless you're writing a library. I'm still considering adding a flow->iterable operator to missionary.core, so if you find it useful feel free to elaborate.
I like to understand the mechanics of a library to get a feeling for the abstractions, the tradeoffs, the performance etc. For now missionary remains a black box. I managed to understand how cloroutine works, so I guess that's 1 step in the right direction :)
BTW to fully emulate unix fork, ?? should be replaced by ?= to allow both branches to run concurrently.
This also begs the question, how does ?=
run code concurrently without using a threadpool? Is it parking the forked tasks at safepoints? What are those safepoints?
I like to understand the mechanics of a library to get a feeling for the abstractions, the tradeoffs, the performance etc.
I suggest you take some time to understand task and flow. They're basically callback-based protocols wrapped in constructor functions, so you can easily see what happens at the REPL.
Is it parking the forked tasks at safepoints?
Parking happens when the computation requires a value that is not immediately available. It doesn't use a threadpool, the code is run synchronously by the thread responsible for unparking.
I'll keep this issue open until the blocking iterator case is settled, please open a new one if you want to discuss the execution model further.
The object returned by this function could reasonably implement java.lang.Iterable
, clojure.lang.IReduceInit
and clojure.lang.Sequential
, and a possible name could be educe
to match clojure.core/eduction
's semantics (lazy but not memoized).
The clojurescript version should be hardly useful because we can't block, but we can still provide a degraded version of the same function that would throw an exception when a result is not immediately available.
(ns dustingetz.scratch
(:require [missionary.core :as m]
[hyperfiddle.rcf :refer [tests]]))
(defn iterator-consumer "blocking iterable pattern"
[^java.util.Iterator it]
; why not one thread tied to the iterator extent?
; (future (while (.hasNext it) (! (.next it))))
(loop []
(if (m/? (m/via m/blk (.hasNext it)))
(m/amb (m/? (m/via m/blk (.next it))) (recur))
(defn seq-consumer [xs]
(loop [xs xs]
(if (m/? (m/via m/blk (seq xs)))
(m/amb (m/? (m/via m/blk (first xs))) (recur (rest xs)))
(def !it (.iterator (.keySet (java.lang.System/getProperties))))
(->> (iterator-consumer !it)
(m/eduction (take 3))
(m/reduce conj []) m/?)
:= ["java.specification.version" "sun.jnu.encoding" "java.class.path"]
; careful, Java iterator is stateful
(def xs (iterator-seq (.iterator (.keySet (java.lang.System/getProperties)))))
(take 3 xs) := ["java.specification.version" "sun.jnu.encoding" "java.class.path"]
(->> (seq-consumer xs)
(m/eduction (take 3))
(m/reduce conj []) m/?)
:= ["java.specification.version" "sun.jnu.encoding" "java.class.path"])