streamz
streamz copied to clipboard
Collect does not allow awaitable sinks
The collect
class does not allow for awaitable sinks.
Small example:
async def sink_to_something(x):
print(x)
return await asyncio.sleep(1)
source = streamz.Source()
collector = source.collect()
collector.sink(sink_to_something)
for i in range(10):
source.emit(i)
collector.flush()
Changing def flush
in the collect class from:
@Stream.register_api()
class collect(Stream):
...
def flush(self, _=None):
out = tuple(self.cache)
metadata = list(self.metadata_cache)
self._emit(out, metadata)
...
To:
@Stream.register_api()
class collect(Stream):
...
def flush(self, _=None):
out = tuple(self.cache)
metadata = list(self.metadata_cache)
# change self._emit to self.emit (self.emit waits for awaitable results from downstream)
self.emit(out, metadata=metadata)
...
Fixed this problem, but I'm not sure if this has any drawbacks.