streamz
streamz copied to clipboard
'cannot reuse already awaited coroutine' with timed_window on Python 3.7.5
Using timed_window
in Python 3.7.5 can sometimes lead to a coroutine being awaited more than once.
Here's a sample program:
async def my_sink(x):
print(x)
await sleep(2)
# raise Exception("Blah!")
async def main():
source = Stream(asynchronous=True)
source.timed_window(interval=1).sink(my_sink)
for x in range(100):
await source.emit(x)
await sleep(0.2)
if __name__ == "__main__":
run(main(), debug=True)
I narrowed down the reason to the same coroutine being possibly returned more than once in https://github.com/python-streamz/streamz/blob/master/streamz/core.py#L917.
A way to work around this issue is to create a task that runs the coroutine. Modifying the update
method of the Sink
class to return a task fixes the issue. The tests in tests/core.py
continue pass.
diff --git a/streamz/core.py b/streamz/core.py
index fe588ed..6916f19 100644
--- a/streamz/core.py
+++ b/streamz/core.py
@@ -535,7 +535,7 @@ class sink(Stream):
def update(self, x, who=None):
result = self.func(x, *self.args, **self.kwargs)
if gen.isawaitable(result):
- return result
+ return gen.convert_yielded(result)
else:
return []
I'm quite sure this can break something else. What am I missing?
Also, how can we go about writing a test case that demonstrates the issue?