aiostream icon indicating copy to clipboard operation
aiostream copied to clipboard

Idea: Broadcasting to multiple consumers simultaneously

Open andersea opened this issue 5 years ago • 5 comments

I have a few instances where I need to send the same items to multiple consumers. I have a working implementation for trio.

def broadcast(aiter):
    send_channels = []
    lock = trio.StrictFIFOLock()

    async def listen():
        send_channel, receive_channel = trio.open_memory_channel(1)
        send_channels.append(send_channel)
        try:
            while True:
                async with lock:
                    try:
                        yield receive_channel.receive_nowait()
                    except trio.WouldBlock:
                        value = await aiter.__anext__()
                        for s in send_channels:
                            if s != send_channel:
                                s.send_nowait(value)
                        yield value
        finally:
            send_channels.remove(send_channel)

    return listen

This relies on the lock being fair. Otherwise what could happen is, that if one listener somehow got ahead of the pack, s.send_nowait could try to send to a full channel, which would cause the listen function to throw a trio.WouldBlock error.

What do you think? Is this of interest for aiostream?

andersea avatar Dec 22 '19 14:12 andersea

Hi @andersea !

I have a few instances where I need to send the same items to multiple consumers.

There's a related issue about this use case: #35

It's not implemented yet but I think it makes sense. It's somehow related to the use case you described previously, i.e dispatching to several streams. I think it could be interesting to use a similar syntax for both cases:

# Cold stream - dispatching
producer_stream = stream.iterate(produce())
async with producer_stream.stream() as producer:
    [...]

# Hot stream - broadcasting
producer_stream = stream.iterate(produce())
async with producer_stream.hotstream() as hot_producer:
    [...]

What do you think?

vxgmichel avatar Dec 22 '19 15:12 vxgmichel

Ok. I have read through #35

It does look like the same basic idea. It behaves a bit differently from what I need. If I understand it correctly, listeners that lag behind will potentially lose messages in that implementation, since the hotstream will replace futures if the queue is full. This seems undesirable for me. I would prefer to apply back pressure in this case.

The implementation wouldn't be directly trio compatible, as far as I can tell, since you don't have tasks in trio.

andersea avatar Dec 22 '19 21:12 andersea

My broadcast implementation is not safe, I have found.

Here is a simple example that causes a deadlock:

import string

import trio
from aiostream import stream

from broadcast import broadcast

async def producer():
    i = 0
    lowercase_letters = string.ascii_lowercase
    while True:
        for letter in lowercase_letters:
            value = (i % 2, letter)
            print(f'Producer yielding {value}')
            yield value
            i += 1
            await trio.sleep(.5)

async def main():
    listen = broadcast(producer())
    zipped = stream.zip(
        stream.filter(listen(), lambda i: i[0] == 0),
        stream.filter(listen(), lambda i: i[0] == 1)
    )
    async with zipped.stream() as s:
        async for item in s:
            print(item)

trio.run(main)

The deadlock happens after one item has been received and the zip function gets stuck waiting for the second item forever. I am actually not entirely sure why, yet. But at least after a couple of days of debugging I have a simple example to reproduce it.

andersea avatar Dec 29 '19 20:12 andersea

I managed to get it working.

Although I was not able to exactly analyze how the deadlocks occured, I am able to gather this much:

First off, there is lock contention. The two listeners will compete for the lock not allowing read because another coroutine is trying to write, or vise versa.

Secondly, the channel buffer maxes out. This happens because the producer can generate items, that whoever is listening is not interested in. These items pile op on the channel buffer. The only solution I see is to either prefilter the items, before they are allowed into the channel buffer, or else allow the channel buffer to grow to infinite size. In my use case, the consumers should always be able to catch up to the producer eventually, so although this isn't what I would ideally like, it is not likely to cause any further problems.

So it seems backpressure can be bad after all. :)

andersea avatar Dec 29 '19 23:12 andersea

Here is my current implementation.

def broadcast(aiter):
    send_channels = []
    lock = trio.Lock()
    id = count()

    async def listen():
        send_channel, receive_channel = trio.open_memory_channel(math.inf)
        send_channels.append(send_channel)
        my_id = next(id)
        try:
            log.debug('Listener %s starting' % my_id)
            while True:
                try:
                    log.debug('Listener %s attempting to get next item.', my_id)
                    yield receive_channel.receive_nowait()
                except trio.WouldBlock:
                    # Receive channel is empty, try broadcasting the next value.
                    if lock.locked():
                        # Another listener is currently broadcasting.
                        # Wait for the next value to arrive.
                        log.debug('Listener %s would have blocked acquiring the lock. Wait for next value instead.', my_id)
                        yield await receive_channel.receive()
                    else:
                        log.debug('Listener %s queue empty. Acquiring lock.', my_id)
                        lock.acquire_nowait()
                        # We got the lock. Get next value and broadcast it.
                        log.debug('Listener %s acquired the lock. Getting next item.', my_id)
                        value = await aiter.__anext__()
                        log.debug('Listener %s broadcasting.', my_id)
                        for i, s in enumerate(send_channels):
                            # Will apply backpressure until all listeners
                            # have processed the previous value.
                            log.debug('Listener %s sending to listener %s.', my_id, i+1)
                            s.send_nowait(value)
                        log.debug('Listener %s completed broadcast. Releasing lock.', my_id)
                        lock.release()
        except trio.EndOfChannel:
            log.debug('Listener %s channel closed.', my_id)
        except StopAsyncIteration:
            log.debug('Listener %s exhausted the generator. Closing send channels.', my_id)
            for s in send_channels:
                await s.aclose()
        finally:
            log.debug('Listener %s quit. Removing own send channel.', my_id)
            send_channels.remove(send_channel)

    return listen

andersea avatar Dec 30 '19 17:12 andersea