streamz
streamz copied to clipboard
Real-time stream processing for python
RunTimeError when trying to filter dataframe in my websocket client. The dataframe works without filtering - has anyone run into this issue before? ``` class Socket(threading.Thread): def __init__(self, symbol=None): super(Socket,...
It might be nice to have warnings/errors raised when the function signature inside map et.al. does not match the number of incoming nodes. This won't work in all cases, since...
This code: ``` from __future__ import division, print_function from time import sleep from streamz import Stream from dask.distributed import Client client = Client() def callback(datas): print(':',datas) return datas source =...
in `core.py` the set `_global_sinks` captures all constructed sink objects. What was it used for? This should be either removed or replaced with a `WeakSet`
I've been using streamz to consume events from kafka. For that I built a pipeline which I run in a daemon process, managed by the python daemon package (https://pypi.org/project/python-daemon/). The...
I have run two simple pipelines(Non-dask and Dask) to read stream of messages from Kafka using Streamz FromKafkaBatched method. I seem to get similar throughput for both the pipelines. **Non-Dask...
I would like to propose an idea of having cached consumers for FromKafkaBatched class and reuse consumers for getting next batch. The current FromKafkaBatched class create a new Kafka consumer...
I am trying to profile the below simple streamz pipeline with from_kafka_batched method, ``` from streamz import Stream from time import time, sleep from tornado import gen def increment(x): return...
Would it be possible to have a `filter` `DaskStream`? I'm happy to put in a PR but I don't know where to start.
Networkx provides many great tools for inspection, analysis, and manipulation of graphs. It might be nice to be able to use these tools when working with streamz. Use Cases: UC1:...