RxPY icon indicating copy to clipboard operation
RxPY copied to clipboard

Connectable observable sends events multiple times

Open igordertigor opened this issue 3 years ago • 2 comments

I'm using rxpy for a realtime audio processing tool. The tool receives two event streams, one that contains audio chunks and one that contains small text like annotation snippets. The flow looks similar to this:

audio ----> .publish() ----> rec ----> p1 ---> p2 ---> p 3 ---> p4 ---> merge ---> output
                                        \              /               /
                                          ---> p5 ----                /
annotations --------------------------------------------------> p6 ---

Here, the p<x> nodes are processing nodes (implemented in numpy/pytorch) and the rec node is a recorder, that writes its input to disk and otherwise passes it on unchanged. I'm using the .publish() call to support the branching that comes after the rec node.

When I open the audio file written by the rec node, every chunk has been written 3 times, which implies that the rec node received every chunk 3 times. Is this intended behaviour? How can I avoid this? I'm worried that downstream nodes (p1-p5) might receive multiple repetitions of the same chunk as well and therefore might not operate as intended. However, the pipeline as a whole seems to work correctly.

I tried a number of variations:

  1. No .publish() call: Events in the first pipeline get stuck right before p3. p4 never receives any events.
  2. Introduce a separate step before the rec node that drops events if they have the same md5 sum as the previous event (either using a combination of scan and filter or a filter with a class). This makes the audio file look ok, but the overall pipeline becomes prohibitively slow and is essentially broken.
  3. Move the .publish() call to a later stage: The results are essentially the same as 1.

Thank you for your help.

igordertigor avatar Dec 03 '21 11:12 igordertigor

Hello @igordertigor,

How is done the branch/merge at p1 and p3? your issue may come from that part of the pipeline

MainRo avatar Dec 07 '21 22:12 MainRo

Hi @MainRo, thanks for your reply. Unfortunately it got lost in my github notifications. Regarding your question: I'm calling .publish() on the audio node. The I store the output up to p1 in a variable, that I use in both, p2 and p5, i.e. conceptually like this:

inp = audio.publish()
out_p1 = inp.pipe(map(rec), map(p1))
out_p2 = out_p1.pipe(map(p2))
out_p5 = out_p1.pipe(map(p5))
out_p3 = out_p2.pipe(merge(out_p5), ...)

Once the whole pipeline is set up, I call audio.connect(). Thanks for your help.

igordertigor avatar Jan 10 '22 16:01 igordertigor