xforms
xforms copied to clipboard
Infinite loop with stateful transducer on core.async channel
When running a stateful transducer on a core.async channel I get an unexpected behavior:
(go (println
(<! (let [c (async/chan 1 xforms/count)]
(async/onto-chan! c (range 5))
(async/into []
(async/take 10 c))))))
Outputs:
[5 5 5 5 5 5 5 5 5 5]
Expected output:
[5]
When running the same transducer on a sequence:
(into [] xforms/count (range 5))
I get the expected outcome ([5
).
I have posted a question on ask.clojure.org (https://ask.clojure.org/index.php/9529/infinite-loop-with-stateful-transducer-core-async-channel). A fix has been provided there but I don't understand what is going. I am not sure if this is a bug in xforms or expected behavior. Maybe you could provide some insight?
I'm having the same problem, but mine came up using sort-by
. I'd really like to be able to use things like by-key and others in my async pipelines
Weird. I'm going to look into it.
I wonder if it shouldn't be requalified as a core.async bug: a closed transducing channel doesn't remember having called the completing arity of its rf:
user=> (def c (a/chan 1 (fn [rf] (fn ([] (rf)) ([acc] (rf (unreduced (rf acc [:completed (System/currentTimeMillis)])))) ([acc _] acc)))))
#'user/c
user=> (a/close! c)
nil
user=> (a/<!! c)
[:completed 1603187902345]
user=> (a/<!! c)
[:completed 1603187908246]
user=> (a/<!! c)
[:completed 1603187909392]
user=> (a/<!! c)
[:completed 1603187910475]
user=> (a/<!! c)
[:completed 1603187911584]
It has a broad impact: all transducers that flush on completion should keep track of having completed.