broadcaster icon indicating copy to clipboard operation
broadcaster copied to clipboard

Kafka backend can't unsubscribe from individual channel

Open amacfie opened this issue 2 years ago • 3 comments

Currently, unsubscribing from one Kafka channel unsubscribes from all channels. Based on the aiokafka docs I'm guessing we want to do

self._consumer_channels.remove(channel)
self._consumer.subscribe(topics=self._consumer_channels)

amacfie avatar May 13 '22 13:05 amacfie

Also, when we call AIOKafkaConsumer we might want to add auto_offset_reset="latest" based on https://aiokafka.readthedocs.io/en/stable/consumer.html#controlling-the-consumer-s-position. Even then, when we change the topics we're subscribed to it's not obvious to me that we won't miss events or process events multiple times.

amacfie avatar May 18 '22 20:05 amacfie

Why didn't merge this PR?

tsotnesharvadze avatar Oct 23 '23 06:10 tsotnesharvadze

Currently, unsubscribing from one Kafka channel unsubscribes from all channels. Based on the aiokafka docs I'm guessing we want to do

self._consumer_channels.remove(channel)
self._consumer.subscribe(topics=self._consumer_channels)

When _counsumer_channels will be empty it raise error, so:

    async def unsubscribe(self, channel: str) -> None:
        self._consumer_channels.remove(channel)
        if self._consumer_channels:
            self._consumer.subscribe(topics=list(self._consumer_channels))
        else:
            self._consumer.unsubscribe()

tsotnesharvadze avatar Oct 23 '23 12:10 tsotnesharvadze