streamz icon indicating copy to clipboard operation
streamz copied to clipboard

'cannot reuse already awaited coroutine' with timed_window on Python 3.7.5

Open balajirrao opened this issue 5 years ago • 0 comments

Using timed_window in Python 3.7.5 can sometimes lead to a coroutine being awaited more than once.

Here's a sample program:

async def my_sink(x):
    print(x)
    await sleep(2)
    # raise Exception("Blah!")


async def main():
    source = Stream(asynchronous=True)
    source.timed_window(interval=1).sink(my_sink)

    for x in range(100):
        await source.emit(x)
        await sleep(0.2)


if __name__ == "__main__":
    run(main(), debug=True)

I narrowed down the reason to the same coroutine being possibly returned more than once in https://github.com/python-streamz/streamz/blob/master/streamz/core.py#L917.

A way to work around this issue is to create a task that runs the coroutine. Modifying the update method of the Sink class to return a task fixes the issue. The tests in tests/core.py continue pass.

diff --git a/streamz/core.py b/streamz/core.py
index fe588ed..6916f19 100644
--- a/streamz/core.py
+++ b/streamz/core.py
@@ -535,7 +535,7 @@ class sink(Stream):
     def update(self, x, who=None):
         result = self.func(x, *self.args, **self.kwargs)
         if gen.isawaitable(result):
-            return result
+            return gen.convert_yielded(result)
         else:
             return []

I'm quite sure this can break something else. What am I missing?

Also, how can we go about writing a test case that demonstrates the issue?

balajirrao avatar Nov 29 '19 12:11 balajirrao