manifold
manifold copied to clipboard
BlockingQueueSource should be closable
(def a (s/->source (java.util.concurrent.LinkedBlockingQueue. [1 2 3])))
(def b (s/batch 100 1 a))
(s/close! a)
(s/description a)
;; {:type "java.util.concurrent.LinkedBlockingQueue", :buffer-size 0, :source? true}
(s/description b)
;; {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true}
I guess this is because the queue is not closable. Would be nice to have the downstreams disconnect in this case though.
~~Unless overridden, the default impl of .close
does nothing, and it's not currently overridden for BlockingQueueSource
. I agree the "source-wrapped" version should be closable, though, and prevent further takes.~~
EDIT 2023-2-28: Oops, slosing should prevent puts, not takes. Takes can happen until drained
This doesn't seem to be a particular issue with BlockingQueueSource
honestly.
(defn foo []
(let [a (s/stream 4)
b (s/batch 100 1 a)]
(s/put-all! a [1 2 3 4])
(s/close! a)
(println (s/description b))))
This produces exactly the same behavior, using a regular stream.
However, by removing the (s/put-all! a [1 2 3 4])
call, b
is closed when a
is closed. I believe the reason for that is because when there's nothing to take from a
, the pending takes implied by batch
end up being stored in a
's consumers, and then the close!
call resolves those consumers with their default values, allowing the graph to close downstream, in this case, b
. This doesn't happen when the take!
s are resolved by put!
s since the consumers are properly resolved and not reported to the graph.
There's a bit of a race condition here, though. Behind the scenes, multiple threads are being used, so it's completely possible for the description of b to be printed out before the closing of a has fully propagated its effects to b. I ran that code snippet multiple times, and got different results.
In general, ordering around closing can get a little weird. Unfortunately, the current default close!
behavior doesn't return a deferred we could wait on...should probably fix that.