streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Make RX like graphs

Open CJ-Wright opened this issue 7 years ago • 3 comments

RX has some nice tutorials, with great graphs showing how streams get combined, eg combine latest. It would be nice if we could have these too, especially as we have extended some of the operators beyond their tutorials. Even better it would be awesome if we could generate these through the streams themselves.

One potential way to do this:

  1. Attach sinks to all the nodes in question which keep track of the time when the data came through the nodes
  2. Use each stream's timestamps to generate a timeline of the execution
  3. Make pretty timeline? (RX is interactive, which is great but I don't know if we need that functionality) I just don't know how to do this. Tikz, matplotlib, something else?

Ideally we would create a whole bunch of these examples so we could put them in the docs. Then every time the docs are generated we could recreate them, just in case we introduce a new execution model or change the nodes (or even check to make certain that the nodes behave as we expect them to be)

@ordirules (since we were talking about this on #48)

CJ-Wright avatar Aug 10 '17 00:08 CJ-Wright

import time
from streams import Stream
from operator import add
import matplotlib.pyplot as plt


def star(f):
    """Take tuple and unpack it into args"""
    @ft.wraps(f)
    def wraps(args):
        return f(*args)

    return wraps

a = Stream()
b = Stream()
c = a.zip(b)

# Make the combo data
d = c.map(star(add))

def timer(*args):
    return time.time()

# Get the times
Ls = [s.map(timer).sink_to_list() for s in [a, b, c]]

# Get the labels
lab = [s.sink_to_list() for s in [a, b, d]]


# do the specific time trial(s)
t0 = time.time()
for i in range(5):
    a.emit(str(i))
for j in ['A', 'B', 'C', 'D', 'E']:
    b.emit(j)

for j, (L, M) in enumerate(zip(Ls, lab)):
    for l, m in zip(L, M):
        plt.scatter(l - t0, (3-j) * .01, marker=r"$ {} $".format(m), s=1000)
    plt.axhline((3-j) * .01)
fLs = [item - t0 for sublist in Ls for item in sublist]
plt.xlim(min(fLs) * .9, max(fLs) * 1.1)
plt.show()

This almost works, except on my machine the timings aren't just right. Is there any way I can slow each node down?

CJ-Wright avatar Aug 10 '17 01:08 CJ-Wright

@mrocklin is there a way to have the map function delay asynchronously? It seems that the map (and it's timer sink) are actually executed before the b timer sink.

CJ-Wright avatar Aug 10 '17 02:08 CJ-Wright

FYI after discussion with @CJ-Wright we may try to follow Rx (the figures are interactive) http://rxmarbles.com/

combine_latest here

jrmlhermitte avatar Aug 10 '17 04:08 jrmlhermitte