streamz
streamz copied to clipboard
Make RX like graphs
RX has some nice tutorials, with great graphs showing how streams get combined, eg combine latest. It would be nice if we could have these too, especially as we have extended some of the operators beyond their tutorials. Even better it would be awesome if we could generate these through the streams themselves.
One potential way to do this:
- Attach sinks to all the nodes in question which keep track of the time when the data came through the nodes
- Use each stream's timestamps to generate a timeline of the execution
- Make pretty timeline? (RX is interactive, which is great but I don't know if we need that functionality) I just don't know how to do this. Tikz, matplotlib, something else?
Ideally we would create a whole bunch of these examples so we could put them in the docs. Then every time the docs are generated we could recreate them, just in case we introduce a new execution model or change the nodes (or even check to make certain that the nodes behave as we expect them to be)
@ordirules (since we were talking about this on #48)
import time
from streams import Stream
from operator import add
import matplotlib.pyplot as plt
def star(f):
"""Take tuple and unpack it into args"""
@ft.wraps(f)
def wraps(args):
return f(*args)
return wraps
a = Stream()
b = Stream()
c = a.zip(b)
# Make the combo data
d = c.map(star(add))
def timer(*args):
return time.time()
# Get the times
Ls = [s.map(timer).sink_to_list() for s in [a, b, c]]
# Get the labels
lab = [s.sink_to_list() for s in [a, b, d]]
# do the specific time trial(s)
t0 = time.time()
for i in range(5):
a.emit(str(i))
for j in ['A', 'B', 'C', 'D', 'E']:
b.emit(j)
for j, (L, M) in enumerate(zip(Ls, lab)):
for l, m in zip(L, M):
plt.scatter(l - t0, (3-j) * .01, marker=r"$ {} $".format(m), s=1000)
plt.axhline((3-j) * .01)
fLs = [item - t0 for sublist in Ls for item in sublist]
plt.xlim(min(fLs) * .9, max(fLs) * 1.1)
plt.show()
This almost works, except on my machine the timings aren't just right. Is there any way I can slow each node down?
@mrocklin is there a way to have the map function delay asynchronously? It seems that the map (and it's timer sink) are actually executed before the b timer sink.
FYI after discussion with @CJ-Wright we may try to follow Rx (the figures are interactive) http://rxmarbles.com/
combine_latest
here