manifold icon indicating copy to clipboard operation
manifold copied to clipboard

Stream pipeline correctly connected by take! hangs

Open lxsameer opened this issue 8 years ago • 6 comments

Hi First of all kudos. I have several components which everyone of them has an input stream and an output stream. I connected my input and output of components like:

input of component A connected to output of component A. output of component A connected to input of component B. input of B connected to output of B output of B connected to input of C and finally input of C connected to output of C.

I confirmed this by walking the pipeline using downstream function. here is the output of the walk:

STREAM: manifold.stream.default.Stream@2db12c5b  ->  manifold.stream.default.Stream@5572f9d1

STREAM: manifold.stream.default.Stream@5572f9d1  ->  manifold.stream.default.Stream@656ed510

STREAM: manifold.stream.default.Stream@656ed510  ->  manifold.stream.default.Stream@8d5cdb0

STREAM: manifold.stream.default.Stream@8d5cdb0  ->  manifold.stream.default.Stream@46a7703a

STREAM: manifold.stream.default.Stream@46a7703a  ->  manifold.stream.default.Stream@60399d13

But the problem is that when I put! something in any of these streams, and try-take! the value in either the stream itself or the downstream, take! hangs out and try-take! returns the timeout value. But the put! return value derefs to true.

My component functions are fairly simple:

  (fn [component]
    ;; cpmponent is a map.
    (let [input  (hcomp/input component)
          output (hcomp/output component)]
      (println "INPUT: " (str input))
      (println "OUTPUT: " (str output))
      (stream/connect input output)
      component))

If i remove the call to connect function in the function ( not create a pipeline ) I can take a value from the same stream which i put the value in.

NOTE: I tried to debug this issue in latest manifold version. I found out that the producers in https://github.com/ztellman/manifold/blob/master/src/manifold/stream/default.clj#L204 is empty for the downstream stream. My guess is that put method of stream returns true but it does not actually put the value in the producers. But i didn't confirm this guess.

lxsameer avatar Oct 09 '17 23:10 lxsameer

I just tried to reproduce your issue based on your description:

> (def a (s/stream))

> (def b (s/stream))

> (def c (s/stream))

> (s/put-all! a [1 2 3])
<< … >>

> (s/connect a b)
nil

> (s/connect b c)
nil

> (->> c s/stream->seq (take 3))
(1 2 3)

This seems to work as expected. Can you expand on how my example differs from what you're doing?

ztellman avatar Oct 10 '17 00:10 ztellman

Thanks for the quick response. Some friend in clojurians pointed out to me that once I connected several streams together. I only can consume messages from the final output. So in my example I only can consume from C's output which works just fine. So it was my bad and my misunderstanding :pray: . But it would be a good idea to update a docs with a note about this.

lxsameer avatar Oct 10 '17 07:10 lxsameer

Hey, I was the one who answered the question.

@lxsameer - would be interesting to hear how you thought the streams worked and what your initial design was based on your mental model. Probably you aren't the only one making the same assumptions. Will be easier to clarify the documentation after we clear this up.

dm3 avatar Oct 10 '17 11:10 dm3

I read the docs on aleph.io many times and my assumption was that when you connect a series of streams to each other, you still can inspect each stream along the way. For example if I connect A -> B -> C. Then by putting some value in A I should be able to take that from B as well without effecting the pipeline ( I still should be able to get it from C too ). Because in my use case I connect the components input and output based on a workflow graph dynamically.

lxsameer avatar Oct 10 '17 11:10 lxsameer

If you connect B to multiple streams, the messages will be sent to both, but all operators in manifold.stream operate via side effects. If I'm understanding what you're saying, you seem to be expecting them to work like Clojure's seqs, which don't disappear when transformed elsewhere.

If things did work this way, then each stream in your software would effectively hold onto every message forever, eventually leading to a memory leak. This happens fairly often with large lazy sequences, where you mistakenly hold onto the head of the sequence. This is why Manifold mimics the semantics of core.async, Java queues, and other similar abstractions by removing messages once they're consumed.

ztellman avatar Oct 10 '17 17:10 ztellman

Thanks @ztellman @dm3 . I totally understand the flow right now. But I suggest to add a small note to the docs to help future users understand the concept better and don't make the same mistake as I did.

lxsameer avatar Oct 10 '17 18:10 lxsameer