RxPY
RxPY copied to clipboard
Connectable observable sends events multiple times
I'm using rxpy for a realtime audio processing tool. The tool receives two event streams, one that contains audio chunks and one that contains small text like annotation snippets. The flow looks similar to this:
audio ----> .publish() ----> rec ----> p1 ---> p2 ---> p 3 ---> p4 ---> merge ---> output
\ / /
---> p5 ---- /
annotations --------------------------------------------------> p6 ---
Here, the p<x> nodes are processing nodes (implemented in numpy/pytorch) and the rec node is a recorder, that writes its input to disk and otherwise passes it on unchanged. I'm using the .publish() call to support the branching that comes after the rec node.
When I open the audio file written by the rec node, every chunk has been written 3 times, which implies that the rec node received every chunk 3 times. Is this intended behaviour? How can I avoid this? I'm worried that downstream nodes (p1-p5) might receive multiple repetitions of the same chunk as well and therefore might not operate as intended. However, the pipeline as a whole seems to work correctly.
I tried a number of variations:
- No
.publish()call: Events in the first pipeline get stuck right beforep3.p4never receives any events. - Introduce a separate step before the
recnode that drops events if they have the same md5 sum as the previous event (either using a combination ofscanandfilteror afilterwith a class). This makes the audio file look ok, but the overall pipeline becomes prohibitively slow and is essentially broken. - Move the
.publish()call to a later stage: The results are essentially the same as 1.
Thank you for your help.
Hello @igordertigor,
How is done the branch/merge at p1 and p3? your issue may come from that part of the pipeline
Hi @MainRo,
thanks for your reply. Unfortunately it got lost in my github notifications. Regarding your question: I'm calling .publish() on the audio node. The I store the output up to p1 in a variable, that I use in both, p2 and p5, i.e. conceptually like this:
inp = audio.publish()
out_p1 = inp.pipe(map(rec), map(p1))
out_p2 = out_p1.pipe(map(p2))
out_p5 = out_p1.pipe(map(p5))
out_p3 = out_p2.pipe(merge(out_p5), ...)
Once the whole pipeline is set up, I call audio.connect(). Thanks for your help.