streamz icon indicating copy to clipboard operation
streamz copied to clipboard

[Q] stream asyncio dataframes to_dataframe usage

Open hofab opened this issue 7 years ago • 0 comments

Hey guys,

Great work here, I just have a problem understanding how we translate a stream of dataframe so we can use tail() etc.

I have a coroutine, that emits dataframes to my stream.

from streamz import Stream
from tornado.ioloop import IOLoop

@gen.coroutine
def f():
    source = Stream(asynchronous=True)  # tell the stream we're working asynchronously
    example = pd.DataFrame({'x': []})
    sdf = source.to_dataframe(example=example)
    for x in range(10):
        yield gen.sleep(0.1)
        yield source.emit(readField())
    sdf.tail()
    sdf.cumsum()
    print(sdf)

IOLoop().run_sync(f)

with:

@asyncio.async
def readField():
    """ 
    asynchronously - Blocking IO operation
    """
    df = pd.DataFrame({'x': [SomeValues]})
    return df

As output I get:

DataFrame - elements like:
Empty DataFrame
Columns: [x]
Index: []

I am not sure if thats the way we are looking to use the to_DataFrame.

I basically want to send my dataframes to a stream, translate it to bigger dataframes (window etc.) and perform some dynamic printing on the resulting frames. But somehow it seems, that my data is not transmitted to my stream.

Any idea/help would be appreciated.

hofab avatar May 10 '18 18:05 hofab