xforms icon indicating copy to clipboard operation
xforms copied to clipboard

Infinite loop with stateful transducer on core.async channel

Open pukkamustard opened this issue 4 years ago • 3 comments

When running a stateful transducer on a core.async channel I get an unexpected behavior:

(go (println
     (<! (let [c (async/chan 1 xforms/count)]

           (async/onto-chan! c (range 5))

           (async/into []
                       (async/take 10 c))))))

Outputs:

[5 5 5 5 5 5 5 5 5 5]

Expected output:

[5]

When running the same transducer on a sequence:

(into [] xforms/count (range 5))

I get the expected outcome ([5).

I have posted a question on ask.clojure.org (https://ask.clojure.org/index.php/9529/infinite-loop-with-stateful-transducer-core-async-channel). A fix has been provided there but I don't understand what is going. I am not sure if this is a bug in xforms or expected behavior. Maybe you could provide some insight?

pukkamustard avatar Aug 09 '20 05:08 pukkamustard

I'm having the same problem, but mine came up using sort-by. I'd really like to be able to use things like by-key and others in my async pipelines

otfrom avatar Oct 20 '20 08:10 otfrom

Weird. I'm going to look into it.

cgrand avatar Oct 20 '20 08:10 cgrand

I wonder if it shouldn't be requalified as a core.async bug: a closed transducing channel doesn't remember having called the completing arity of its rf:

user=>  (def c (a/chan 1 (fn [rf] (fn ([] (rf)) ([acc] (rf (unreduced (rf acc [:completed (System/currentTimeMillis)])))) ([acc _] acc)))))
#'user/c
user=> (a/close! c)
nil
user=> (a/<!! c)
[:completed 1603187902345]
user=> (a/<!! c)
[:completed 1603187908246]
user=> (a/<!! c)
[:completed 1603187909392]
user=> (a/<!! c)
[:completed 1603187910475]
user=> (a/<!! c)
[:completed 1603187911584]

It has a broad impact: all transducers that flush on completion should keep track of having completed.

cgrand avatar Oct 20 '20 09:10 cgrand