streamz
streamz copied to clipboard
Flatten doesn't work with DaskStream
This code:
from __future__ import division, print_function
from time import sleep
from streamz import Stream
from dask.distributed import Client
client = Client()
def callback(datas):
print(':',datas)
return datas
source = Stream().scatter()
stream = source.partition(5)
stream = stream.map(callback)
stream = stream.flatten()
stream.buffer(15).gather().sink(print)
for i in range(30):
source.emit(i)
returns:
Traceback (most recent call last):
File "test.py", line 21, in <module>
: (0, 1, 2, 3, 4)
source.emit(i)
File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 332, in emit
sync(self.loop, _)
File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 1277, in sync
six.reraise(*error[0])
File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 1262, in f
result[0] = yield future
File "/home/klinger/rossum/local/lib/python2.7/site-packages/tornado/gen.py", line 1099, in run
value = future.result()
File "/home/klinger/rossum/local/lib/python2.7/site-packages/tornado/concurrent.py", line 260, in result
raise_exc_info(self._exc_info)
File "/home/klinger/rossum/local/lib/python2.7/site-packages/tornado/gen.py", line 315, in wrapper
yielded = next(result)
File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 327, in _
result = yield self._emit(x)
File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 298, in _emit
r = downstream.update(x, who=self)
File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 734, in update
return self._emit(tuple(result))
File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 298, in _emit
r = downstream.update(x, who=self)
File "/home/klinger/years/2014/projects/devel/streamz/streamz/dask.py", line 58, in update
return self._emit(result)
File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 298, in _emit
r = downstream.update(x, who=self)
File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 1020, in update
for item in x:
TypeError: 'Future' object is not iterable
Without dask interface (removing scatter and gather), everything works well.
Problems:
-
DaskStream
is missing specificflatten()
, so coreStream.flatten()
is used. It produces core streams instead of dask streams. - Even if we attach
flatten
toDaskStream
(same way as for partition), it may get different values:- tuple of Future of value - eg. from
partition
before it - it can process it well - Future of tuple of value - it cannot iterate over Future - we need to yield the future first
- tuple of Future of value - eg. from
Hi guys I am having an issue with stream filtering. It seems to be working oky on numreical columns but failing on categorical columns with an error: TypeError: invalid type comparison.
Is there a way of parsing streamz.dataframe.core.DataFrame to pandas.core.frame.DataFrame?
There is, I think, a little confusion about what a streaming dataframe is. Perhaps this needs a clarification in the docs. A streaming dataframe has no value, and maintains no internal state. It is a container into which dataframe pieces are passed, which can then be passed to various downstream actions or aggregations. There are many ways in which you can get the "current" (i.e., most recent) value of dataframe. For example,
# a little work to set up an event loop; in a notebook, this may not be necessary. Some sources
# do this automatically
import tornado.ioloop
import threading
loop = tornado.ioloop.IOLoop.current()
th = threading.Thread(target=loop.start)
th.start()
import streamz.dataframe
current = [None]
r = streamz.dataframe.Random()
r.stream.sink(lambda x: current.__setitem__(0, x))
Now the variable current
always contains the latest dataframe piece that was emitted.
This is probably not what you meant, though! Perhaps it would be better to describe the workflow you are after in a new issue.
Thanks a lot this was very helpful.