redis4cats icon indicating copy to clipboard operation
redis4cats copied to clipboard

Refactor `Subscriber`

Open peterneyens opened this issue 9 months ago • 4 comments

The main goal is to reduce the overhead we add when we have a lot of subscriptions.

  • Avoid locking for subscribe and unsubscribe. We should be able to subscribe to or unsubscribe from channels/patterns independent of concurrent subscriptions to other channels/patterns without being blocked. In this PR we represent the subscription lifecycle as multiple states. We can move between those state without locking, but still keep operations on the same channel/pattern wait on state changes by using a Deferred as a signal. We don't go back to a single Ref with a Map, but switch to using a MapRef backed by a ConcurrentHashMap allowing us to reduce contention if we have a lot of concurrent state changes.
  • Avoid using multiple RedisPubSubListeners. The listeners are only called one by one. Since we have a map with all subscriptions, we can look up the subscription and publish to its topic in one listener.

This is a more ambitious version of what I started in https://github.com/profunktor/redis4cats/pull/972. Unfortunately this made the diff a lot larger than I wanted it to be.

  • While avoiding the lock clearly makes things more complicated, it also forces us to think more about the different states and state changes (for example unsubscribe failing currently leaves us in a state where we can't retry, since we keep a Redis4CatsSubscription with a single subscription, without actual subscribers to the topic).
  • I moved everything that is only used by the Subscriber implementation into the Subscriber companion object, which I think makes things easier (but that is probably subjective). The Redis4CatsSubscription class is now the Active part of SubscriptionState.

I tried to keep the behavior as closely to the existing implementation as I could. There are a few places that could use some additional work, but I didn't want to make any additional changes to implementation.

  • I think we could unsubscribe in unsubscribe instead of waiting on the last subscription stream finalizer (since no new messages will be processed anyway).
  • There are some potential improvements with how we handle publishing messages. We should document that one single subscriber not keeping up with its channel will block not only all subscriptions for the same channel/pattern but for all channels/patterns, since we can't publish to the topic. We could potentially publish to multiple channels/patterns in parallel (but we should benchmark first)

peterneyens avatar Mar 06 '25 11:03 peterneyens

Sorry for delaying the request for so long. Was busy, then got sick.

I hope to review this this week.

arturaz avatar Mar 27 '25 08:03 arturaz

The main goal is to reduce the overhead we add when we have a lot of subscriptions. (1) Avoid locking for subscribe and unsubscribe....(2) Avoid using multiple RedisPubSubListeners....

I agree with point (2) and I would expect better performance with a single listener that dispatches to single dispatcher. However, we need to be careful of subscribers of one topic blocking subscribers of other topics.

Regarding (1), yeah it's significantly more complicated, which makes it difficult to reason about correctness. Curious how back the blocking could be with AtomicCell and if there is a cat's Read-Write Lock we could use. I like the use of MapRef, but how come we the Subscribing and Unsubscribing states. Lastly, I presume onMessage will get called more often than subscribe and unsubscribe, so we want onMessage to be more efficient, but we need to access the Map of Topics on each method. Does that sound correct?

mmienko avatar Apr 17 '25 15:04 mmienko

but how come we the Subscribing and Unsubscribing states.

If we don't lock while subscribing and unsubscribing, we need to track that we are subscribing or unsubscribing, so that concurrent calls to subscribe/unsubscribe to they same key can wait until the in progress subscribe/unsubscribe finishes (to use the fresh subscription, retry, ...).

This does make it more complicated indeed, that is unfortunately the consequence of removing the single shared lock for all subscriptions. We could wait untl https://github.com/typelevel/cats-effect/pull/4424 lands to make a SubscriptionMap implementation that is more like the existing implementation but with a lock per key.

Lastly, I presume onMessage will get called more often than subscribe and unsubscribe, so we want onMessage to be more efficient, but we need to access the Map of Topics on each method. Does that sound correct?

Yes that is correct, but this should be faster than how we currently call all listerers one by one for all messages.

peterneyens avatar Sep 01 '25 12:09 peterneyens

Sorry for the very long delay, I didn't find time to come back to this.

I pushed the changes I mentioned to handle a subscription lifecycle more explicitly (using Resource) (https://github.com/profunktor/redis4cats/pull/984#discussion_r2022824517). That way we no longer need the fiber await sleeps in SubscriberSuite, because inside the Resource we will have an active subscription.

It would be nice to expose a suscribeAwait(key: K): Resource[F, Stream[F, V]] alongside subscribe in SubscribeCommands to give clients insight in when the subscription is available if they want to, but that is unfortunately a breaking change (and I missed 2.x).

peterneyens avatar Sep 01 '25 12:09 peterneyens