streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Flatten doesn't work with DaskStream

Open bigr opened this issue 6 years ago • 5 comments

This code:

from __future__ import division, print_function

from time import sleep
from streamz import Stream
from dask.distributed import Client

client = Client()

def callback(datas):
    print(':',datas)
    return datas

source = Stream().scatter()
stream = source.partition(5)
stream = stream.map(callback)
stream = stream.flatten()

stream.buffer(15).gather().sink(print)

for i in range(30):
    source.emit(i)

returns:

Traceback (most recent call last):
  File "test.py", line 21, in <module>
: (0, 1, 2, 3, 4)
    source.emit(i)
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 332, in emit
    sync(self.loop, _)
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 1277, in sync
    six.reraise(*error[0])
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 1262, in f
    result[0] = yield future
  File "/home/klinger/rossum/local/lib/python2.7/site-packages/tornado/gen.py", line 1099, in run
    value = future.result()
  File "/home/klinger/rossum/local/lib/python2.7/site-packages/tornado/concurrent.py", line 260, in result
    raise_exc_info(self._exc_info)
  File "/home/klinger/rossum/local/lib/python2.7/site-packages/tornado/gen.py", line 315, in wrapper
    yielded = next(result)
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 327, in _
    result = yield self._emit(x)
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 298, in _emit
    r = downstream.update(x, who=self)
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 734, in update
    return self._emit(tuple(result))
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 298, in _emit
    r = downstream.update(x, who=self)
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/dask.py", line 58, in update
    return self._emit(result)
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 298, in _emit
    r = downstream.update(x, who=self)
  File "/home/klinger/years/2014/projects/devel/streamz/streamz/core.py", line 1020, in update
    for item in x:
TypeError: 'Future' object is not iterable

Without dask interface (removing scatter and gather), everything works well.

bigr avatar Nov 13 '18 13:11 bigr

Problems:

  • DaskStream is missing specific flatten(), so core Stream.flatten() is used. It produces core streams instead of dask streams.
  • Even if we attach flatten to DaskStream (same way as for partition), it may get different values:
    • tuple of Future of value - eg. from partition before it - it can process it well
    • Future of tuple of value - it cannot iterate over Future - we need to yield the future first

bzamecnik avatar Nov 14 '18 13:11 bzamecnik

Hi guys I am having an issue with stream filtering. It seems to be working oky on numreical columns but failing on categorical columns with an error: TypeError: invalid type comparison.

NyakioMureithi avatar Jul 08 '19 20:07 NyakioMureithi

Is there a way of parsing streamz.dataframe.core.DataFrame to pandas.core.frame.DataFrame?

NyakioMureithi avatar Jul 08 '19 20:07 NyakioMureithi

There is, I think, a little confusion about what a streaming dataframe is. Perhaps this needs a clarification in the docs. A streaming dataframe has no value, and maintains no internal state. It is a container into which dataframe pieces are passed, which can then be passed to various downstream actions or aggregations. There are many ways in which you can get the "current" (i.e., most recent) value of dataframe. For example,

# a little work to set up an event loop; in a notebook, this may not be necessary. Some sources
# do this automatically
import tornado.ioloop
import threading
loop = tornado.ioloop.IOLoop.current()
th = threading.Thread(target=loop.start)
th.start()

import streamz.dataframe
current = [None]
r = streamz.dataframe.Random()
r.stream.sink(lambda x: current.__setitem__(0, x))

Now the variable current always contains the latest dataframe piece that was emitted. This is probably not what you meant, though! Perhaps it would be better to describe the workflow you are after in a new issue.

martindurant avatar Jul 09 '19 19:07 martindurant

Thanks a lot this was very helpful.

NyakioMureithi avatar Jul 11 '19 12:07 NyakioMureithi