manifold icon indicating copy to clipboard operation
manifold copied to clipboard

BlockingQueueSource should be closable

Open SevereOverfl0w opened this issue 5 years ago • 3 comments

(def a (s/->source (java.util.concurrent.LinkedBlockingQueue. [1 2 3])))
(def b (s/batch 100 1 a))

(s/close! a)
(s/description a)
;; {:type "java.util.concurrent.LinkedBlockingQueue", :buffer-size 0, :source? true}
(s/description b)
;; {:pending-puts 1, :drained? false, :buffer-size 0, :permanent? false, :type "manifold", :sink? true, :closed? false, :pending-takes 0, :buffer-capacity 0, :source? true}

I guess this is because the queue is not closable. Would be nice to have the downstreams disconnect in this case though.

SevereOverfl0w avatar Jun 13 '19 15:06 SevereOverfl0w

~~Unless overridden, the default impl of .close does nothing, and it's not currently overridden for BlockingQueueSource. I agree the "source-wrapped" version should be closable, though, and prevent further takes.~~

EDIT 2023-2-28: Oops, slosing should prevent puts, not takes. Takes can happen until drained

KingMob avatar Jul 04 '21 19:07 KingMob

This doesn't seem to be a particular issue with BlockingQueueSource honestly.

(defn foo []
  (let [a (s/stream 4)
        b (s/batch 100 1 a)]

    (s/put-all! a [1 2 3 4])

    (s/close! a)

    (println (s/description b))))

This produces exactly the same behavior, using a regular stream.

However, by removing the (s/put-all! a [1 2 3 4]) call, b is closed when a is closed. I believe the reason for that is because when there's nothing to take from a, the pending takes implied by batch end up being stored in a's consumers, and then the close! call resolves those consumers with their default values, allowing the graph to close downstream, in this case, b. This doesn't happen when the take!s are resolved by put!s since the consumers are properly resolved and not reported to the graph.

cosineblast avatar Feb 28 '23 11:02 cosineblast

There's a bit of a race condition here, though. Behind the scenes, multiple threads are being used, so it's completely possible for the description of b to be printed out before the closing of a has fully propagated its effects to b. I ran that code snippet multiple times, and got different results.

In general, ordering around closing can get a little weird. Unfortunately, the current default close! behavior doesn't return a deferred we could wait on...should probably fix that.

KingMob avatar Feb 28 '23 18:02 KingMob