aiostream
aiostream copied to clipboard
[Question] Turn a "cold" stream to a "hot" one?
I went over the documentation but couldn't find any operators to turn a "cold" stream to a "hot" one. For example, given the following code:
import asyncio
from aiostream import stream
async def main():
xs = stream.count(interval=1)
async def worker(nr):
async for val in xs:
print(f'w{nr}: {val}')
t1 = asyncio.create_task(worker(1))
await asyncio.sleep(1.5)
t2 = asyncio.create_task(worker(2))
await asyncio.gather(t1, t2)
asyncio.run(main())
Both tasks create a new iterable, resulting in the following output:
w1 0 # t=0
w1 1 # t=1
w2 0 # t=1.5
w1 2 # t=2
w2 1 # t=2.5
w1 3 # t=3
However, I'd like the second task to hook into the same stream as the first one, resulting in the following output:
w1 0 # t=0
w1 1 # t=1
w1 2 # t=2
w2 2 # t=2
w1 3 # t=3
w2 3 # t=3
Does aiostream
provide such facilities (similar to rxjs share operator) and if not, what would be the best approach to implement one?
As a side note, thanks for this library! The documentation as well as the source code look very elegant.
Thanks @discosultan!
This is a tricky topic. I'd been thinking about this a while ago but I couldn't come up with anything satisfying. I can try to explain what my opinion is at the moment.
By design, asynchronous generators in python feature a back-pressure mechanism: the generator code only runs when the __anext__
method is called, so the consumer has to finish processing the previous item before it can ask for a new one. It is quite naive but simpler to maintain because no matter how many generators one chains, only a single part of the chain is running at a time. They're asynchronous in the sense that one can run two chains concurrently, but the chain itself runs sequentially.
This is described as the "pull model" by the reactive X introduction, which itself implements the so-called "push model". I'm not a reactive X expert but as far as I understand, the concept of hot observables fits well with the push model since there is already a context for running the observables. That is not the case for python asynchronous generators (and aiostream), so there would need to be an extra task specifically dedicated to the producing of items from the stream.
This could be wrapped and exposed via an async context manager, as shown in this implementation I came up with. I think it works fine but it's still quite different from regular aiostream operators. In the included example, the chain is:
count operator -> hotstream context -> print operator
This seems usable and acceptable, but I'm still a bit confused. For the moment, I can't really tell if it conceptually makes sense. What I like about it is that this hotstream context can be used as a hub, to connect different pipelines to a single source:
pipeline A -> hotstream context -> pipeline X
-> pipeline Y
@discosultan You seem to be more familiar with Rx than I am, so I'd be very curious to hear your opinion on the topic :)
Realise this might be a little off topic, but I took the example hotstream implementation and I'm using it to broadcast events to a web app. Been working spot on, apart from one thing I've not been able to figure out. I want to guard the stream from a asyncio.CancelledError
. I think I want to add something like asyncio.shield
but for this stream.
This happens because I'm using Sanic WebSockets which are fed asyncio.CancelledError
on a disconnect. This causes my hot stream to go... cold... (sorry).
Here's what I have:
from sanic import Sanic
from sanic.response import json
import asyncio
... <hotstreamer here> ...
app = Sanic()
@app.websocket("/")
async def test(request, ws):
async for i in request.app.hot:
await ws.send(str(i))
@app.listener('before_server_start')
async def before_server_start(app, loop):
xs = stream.count(interval=1)
app.hot = await hotstream(xs).stream().__aenter__()
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
await app.hot.__aexit__()
if __name__ == '__main__':
app.run(host="0.0.0.0", port=8000)
Connecting to that web socket will produce a count for how long the server has been running, but on disconnect it will raise the CancellationError within the xs' stream and kill it. I found this by reimplementing the count function as a async generator and catching the CancelledError execution.
I could suppress the CancelledError error in the counter implementation, though that would prevent the await app.hot.__aexit__()
from being able to kill the generator as well.
My thought is to use something like asyncio.shield
, though this doesn't appear to work on async generators. But here's an example of how I thought it might look:
...
@app.listener('before_server_start')
async def before_server_start(app, loop):
xs = stream.count(interval=1)
# keep hold of unshielded asyncgen for cleanup on shutdown
app._hot = await hotstream(xs).stream().__aenter__()
# shield the asyncgen from cancelledError for lifetime of server
app.hot = asyncio.shield(app._hot)
@app.listener('after_server_stop')
async def after_server_stop(app, loop):
# cleanup via unshielded access
await app._hot.__aexit__()
...
Would appreciate any pointers on how to tackle this - been beating my head against it.
Thanks.
Hi @jamesstidard ,
I took the example hotstream implementation and I'm using it to broadcast events to a web app
That's an interesting use case!
This happens because I'm using Sanic WebSockets which are fed asyncio.CancelledError on a disconnect.
It turns out to be related to a flaw in my gist: the cancellation is bubbling up to the background task instead of simply cancelling the hot async iterator. This can fixed by updating its __anext__
method:
class HotAsyncIterator:
[...]
async def __anext__(self):
return await asyncio.shield(self.queue.popleft())
See the gist revisions.
@vxgmichel Ah, I was somewhat on the right tracks. Still not massively confident with asyncio mechanics.
Here's the actual use case, if you're interested. I'm running a redis server for publish and subscribe along with that kind of pubsub model for WebSocket clients to the server. I had implemented this in a kinda cheap way by making a new connection - and holding it open - for every subscription that every clients makes to redis. This quickly breaks the number of max concurrent connections (at least on Heroku's free tier). So, this allows me to have a single connection per server process which subscribes to all notifications and instead have all those client subscriptions filtered out via that hotstream. Basically just shifting the many open subscriptions for python to handle instead of redis.
Anyway, I very much appreciate the response. I'll give this a try this evening. Also thanks for the library, it has helped me produce much more readable code then I would have otherwise.
@jamesstidard
Ah, I was somewhat on the right tracks. Still not massively confident with asyncio mechanics.
Well this idea that being cancelled while awaiting a background task also cancels the background task is quite confusing in my opinion. I wonder if this has been discussed somewhere or if it is just the consequence of asyncio.Task
inheriting from asyncio.Future
:thinking:
Here's the actual use case [...]
Interesting! So do you use aiostream operators between redis and the hot stream, between the hot stream and the websocket, or both?
So do you use aiostream operators between redis and the hot stream, between the hot stream and the websocket, or both?
Currently it's looking to be redis (1) -> hotstream (1) -> operators (m) -> WebSocket (m)
. So a single stream which is subscribed to updates published on Redis and then many operators filtering etc broadcasts back to connected WebSocket clients.
I did need to make the queues in the HotStream class asyncio friendly as I was encountering self.queue.popleft()
raising a IndexError
if it beat the broadcaster to the punch.
Here's the altered source if it passes a sanity check:
import asyncio
from aiostream import streamcontext
from aiostream.aiter_utils import anext
from aiostream.core import Stream, Streamer
async def cancel_and_join(task):
task.cancel()
try:
await task
except asyncio.CancelledError:
pass
class HotAsyncIterator:
def __init__(self, queue):
self.queue = queue
def __aiter__(self):
return self
async def __anext__(self):
next_ = await self.queue.get()
value = await asyncio.shield(next_)
return value
class HotStreamer(Streamer):
def __init__(self, source, maxlen=1):
self.source = source
self.maxlen = maxlen
self.queues = []
self.task = None
self.future = None
self.started = asyncio.Event()
async def __aenter__(self):
self.task = asyncio.create_task(self._target())
await self.started.wait()
return self
async def __aexit__(self, *args):
await self.aclose()
async def _target(self):
async with streamcontext(self.source) as streamer:
while True:
try:
coro = anext(streamer)
self.future = asyncio.create_task(coro)
for queue in self.queues:
if queue.full():
_ = queue.get_nowait()
queue.put_nowait(self.future)
self.started.set()
await self.future
except Exception:
break
finally:
await cancel_and_join(self.future)
def __aiter__(self):
queue = asyncio.Queue(maxsize=self.maxlen)
queue.put_nowait(self.future)
self.queues.append(queue)
return HotAsyncIterator(queue)
async def aclose(self):
await cancel_and_join(self.task)
class HotStream(Stream):
def __init__(self, source, maxlen=1):
self.source = source
self.maxlen = maxlen
def __aiter__(self):
return HotStreamer(self.source, self.maxlen)
def hotstream(*args, **kwargs):
return HotStream(*args, **kwargs)
Currently it's looking to be
redis (1) -> hotstream (1) -> operators (m) -> WebSocket (m)
Great, thanks for the info!
I did need to make the queues in the HotStream class asyncio friendly as I was encountering self.queue.popleft() raising a IndexError if it beat the broadcaster to the punch.
Right, I updated the gist with your changes. I'm still wondering whether the hotstream
idea is mature enough to go into the lib or not :thinking:
I'm still wondering whether the hotstream idea is mature enough to go into the lib or not 🤔
Yeah, can't really say. It's certainly something that I've found useful for this server-broadcasts publish/subscribe type cases. It's worked out pretty well I'd probably use it in future projects, so having it in aiostream
vs me copy and pasting it around would be be welcomed 😄.
The hotstream thing is a concept in rx
as well - if memory serves. So should play nice with the rest of the operators and stuff aiostream
has. So think the idea of it is pretty safely within the general scope.
The hotstream thing is a concept in rx as well - if memory serves. So should play nice with the rest of the operators and stuff aiostream has. So think the idea of it is pretty safely within the general scope.
Alright, I'm convinced. I don't have the time to integrate this feature at the moment, so I'll leave this issue open as a reminder. Then we'll have to:
- [ ] settle on a naming and interface
- [ ] polish the implementation
- [ ] write some tests
- [ ] write a bit of documentation
In any case, thanks @jamesstidard for the feedback, it's much appreciated :smiley: