streamz icon indicating copy to clipboard operation
streamz copied to clipboard

Collect does not allow awaitable sinks

Open arunaruljothi opened this issue 1 year ago • 0 comments

The collect class does not allow for awaitable sinks.

Small example:

async def sink_to_something(x):
    print(x)
    return await asyncio.sleep(1)

source = streamz.Source()
collector = source.collect()
collector.sink(sink_to_something)

for i in range(10):
    source.emit(i)
    collector.flush()

Changing def flush in the collect class from:

@Stream.register_api()
class collect(Stream):
    ...
    def flush(self, _=None):
        out = tuple(self.cache)
        metadata = list(self.metadata_cache)
        self._emit(out, metadata)
        ...

To:

@Stream.register_api()
class collect(Stream):
    ...
    def flush(self, _=None):
        out = tuple(self.cache)
        metadata = list(self.metadata_cache)
        # change self._emit to self.emit (self.emit waits for awaitable results from downstream)
        self.emit(out, metadata=metadata)
        ...

Fixed this problem, but I'm not sure if this has any drawbacks.

arunaruljothi avatar Apr 04 '23 16:04 arunaruljothi