Refactor `Subscriber`
The main goal is to reduce the overhead we add when we have a lot of subscriptions.
- Avoid locking for
subscribeandunsubscribe. We should be able to subscribe to or unsubscribe from channels/patterns independent of concurrent subscriptions to other channels/patterns without being blocked. In this PR we represent the subscription lifecycle as multiple states. We can move between those state without locking, but still keep operations on the same channel/pattern wait on state changes by using aDeferredas a signal. We don't go back to a singleRefwith aMap, but switch to using aMapRefbacked by aConcurrentHashMapallowing us to reduce contention if we have a lot of concurrent state changes. - Avoid using multiple
RedisPubSubListeners. The listeners are only called one by one. Since we have a map with all subscriptions, we can look up the subscription and publish to its topic in one listener.
This is a more ambitious version of what I started in https://github.com/profunktor/redis4cats/pull/972. Unfortunately this made the diff a lot larger than I wanted it to be.
- While avoiding the lock clearly makes things more complicated, it also forces us to think more about the different states and state changes (for example
unsubscribefailing currently leaves us in a state where we can't retry, since we keep aRedis4CatsSubscriptionwith a single subscription, without actual subscribers to the topic). - I moved everything that is only used by the
Subscriberimplementation into theSubscribercompanion object, which I think makes things easier (but that is probably subjective). TheRedis4CatsSubscriptionclass is now theActivepart ofSubscriptionState.
I tried to keep the behavior as closely to the existing implementation as I could. There are a few places that could use some additional work, but I didn't want to make any additional changes to implementation.
- I think we could unsubscribe in
unsubscribeinstead of waiting on the last subscription stream finalizer (since no new messages will be processed anyway). - There are some potential improvements with how we handle publishing messages. We should document that one single subscriber not keeping up with its channel will block not only all subscriptions for the same channel/pattern but for all channels/patterns, since we can't publish to the topic. We could potentially publish to multiple channels/patterns in parallel (but we should benchmark first)
Sorry for delaying the request for so long. Was busy, then got sick.
I hope to review this this week.
The main goal is to reduce the overhead we add when we have a lot of subscriptions. (1) Avoid locking for subscribe and unsubscribe....(2) Avoid using multiple RedisPubSubListeners....
I agree with point (2) and I would expect better performance with a single listener that dispatches to single dispatcher. However, we need to be careful of subscribers of one topic blocking subscribers of other topics.
Regarding (1), yeah it's significantly more complicated, which makes it difficult to reason about correctness. Curious how back the blocking could be with AtomicCell and if there is a cat's Read-Write Lock we could use. I like the use of MapRef, but how come we the Subscribing and Unsubscribing states. Lastly, I presume onMessage will get called more often than subscribe and unsubscribe, so we want onMessage to be more efficient, but we need to access the Map of Topics on each method. Does that sound correct?
but how come we the
SubscribingandUnsubscribingstates.
If we don't lock while subscribing and unsubscribing, we need to track that we are subscribing or unsubscribing, so that concurrent calls to subscribe/unsubscribe to they same key can wait until the in progress subscribe/unsubscribe finishes (to use the fresh subscription, retry, ...).
This does make it more complicated indeed, that is unfortunately the consequence of removing the single shared lock for all subscriptions. We could wait untl https://github.com/typelevel/cats-effect/pull/4424 lands to make a SubscriptionMap implementation that is more like the existing implementation but with a lock per key.
Lastly, I presume
onMessagewill get called more often thansubscribeandunsubscribe, so we want onMessage to be more efficient, but we need to access the Map of Topics on each method. Does that sound correct?
Yes that is correct, but this should be faster than how we currently call all listerers one by one for all messages.
Sorry for the very long delay, I didn't find time to come back to this.
I pushed the changes I mentioned to handle a subscription lifecycle more explicitly (using Resource) (https://github.com/profunktor/redis4cats/pull/984#discussion_r2022824517). That way we no longer need the fiber await sleeps in SubscriberSuite, because inside the Resource we will have an active subscription.
It would be nice to expose a suscribeAwait(key: K): Resource[F, Stream[F, V]] alongside subscribe in SubscribeCommands to give clients insight in when the subscription is available if they want to, but that is unfortunately a breaking change (and I missed 2.x).