streamz
streamz copied to clipboard
[Q] stream asyncio dataframes to_dataframe usage
Hey guys,
Great work here, I just have a problem understanding how we translate a stream of dataframe so we can use tail() etc.
I have a coroutine, that emits dataframes to my stream.
from streamz import Stream
from tornado.ioloop import IOLoop
@gen.coroutine
def f():
source = Stream(asynchronous=True) # tell the stream we're working asynchronously
example = pd.DataFrame({'x': []})
sdf = source.to_dataframe(example=example)
for x in range(10):
yield gen.sleep(0.1)
yield source.emit(readField())
sdf.tail()
sdf.cumsum()
print(sdf)
IOLoop().run_sync(f)
with:
@asyncio.async
def readField():
"""
asynchronously - Blocking IO operation
"""
df = pd.DataFrame({'x': [SomeValues]})
return df
As output I get:
DataFrame - elements like:
Empty DataFrame
Columns: [x]
Index: []
I am not sure if thats the way we are looking to use the to_DataFrame.
I basically want to send my dataframes to a stream, translate it to bigger dataframes (window etc.) and perform some dynamic printing on the resulting frames. But somehow it seems, that my data is not transmitted to my stream.
Any idea/help would be appreciated.