streamz
streamz copied to clipboard
unique on dask fails to gather results properly
It seems that when gathering after a unique on a DaskStream the results fail to be returned to their non future state. However, this seems limited to only unique on dask, running either with unique in a standard Stream or without in a DaskStream seems to do the trick.
Working example:
In [1]: from dask.distributed import Client
...: client = Client()
...:
In [2]: from streamz import Stream
In [3]: s = Stream()
In [4]: b = s.unique()
In [5]: c = s.scatter().unique().buffer(10).gather()
In [6]: l = b.sink_to_list()
In [8]: ll = c.sink_to_list()
In [9]: for i in range(10):
...: s.emit(i)
In [10]: l
Out[10]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
In [11]: ll
Out[11]:
[<Future: status: finished, type: int, key: int-5c8a950061aa331153f4a172bbcbfd1b>,
<Future: status: finished, type: int, key: int-c0a8a20f903a4915b94db8de3ea63195>,
<Future: status: finished, type: int, key: int-58e78e1b34eb49a68c65b54815d1b158>,
<Future: status: finished, type: int, key: int-d3395e15f605bc35ab1bac6341a285e2>,
<Future: status: finished, type: int, key: int-5cd9541ea58b401f115b751e79eabbff>,
<Future: status: finished, type: int, key: int-ce9a05dd6ec76c6a6d171b0c055f3127>,
<Future: status: finished, type: int, key: int-7ec5d3339274cee5cb507a4e4d28e791>,
<Future: status: finished, type: int, key: int-06e5a71c9839bd98760be56f629b24cc>,
<Future: status: finished, type: int, key: int-ea1fa36eb048f89cc9b6b045a2a731d2>,
<Future: status: finished, type: int, key: int-c56e7bae3484c9b6750417fbf89d6509>]
In [12]: d = s.scatter().buffer(10).gather()
In [13]: lll = d.sink_to_list()
In [14]: for i in range(10):
...: s.emit(i)
...:
In [15]: lll
Out[15]: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
This was produced with Python 3.5.5 | packaged by conda-forge | (default, Feb 13 2018, 05:02:37) and latest master Streamz.
This can be fixed by adding the following to streamz/dask.py - but it doesn't actually appear to deduplicate, I think the futures hash to different values even if having the same key. One could do .unique(key=lambda x: x.key).
@DaskStream.register_api()
class unique(DaskStream, core.unique):
pass
I am not sure if there shouldn't be a more general way to decide which core stream methods should also apply to daskstreams.
That all seems to work, great!
It would be nice to not need the key kwarg here though, since it seems that this would be the default expected behavior?
@CJ-Wright , agreed, but I wouldn't (personally) want to put in specific code for that without some more general considerations of how the streams and daskstreams methods should interact.