Make `PubSubState` a trait
An attempt to make it easier to change the PubSubState implementation or add multiple implementations. This is somewhat of a follow up of https://github.com/profunktor/redis4cats/pull/966.
For some context: I am exploring using keyspace notifications and that led me to look at the pubsub code. My usecase would have a lot of short lived subscriptions, which is probably not the most common pubsub usage. Without any real tests or digging deeper into the lettuce code, I was somewhat worried about contention on a single AtomicCell holding all subscriptions (but it could be completely unfounded).
I hope that the change in this PR makes it easier to make the PubSubState more configurable. I added a sharded PubSubState implementation that is currently unused and only shows how we could support different implementations (but it seems a useful implementation to have).
- This PR does move a lot of the
Subscribercode intoPubSubState. The first one now mostly contains how to create a subscription, while the second one now maintains (and hides) the subscription lifecycle. - I did leave out some of the logs added in #966, I can bring them back if desired, but I wanted to get some feedback first.
- The unsubscribe and subscription cleanup effects aren't guarded by the mutex anymore. It didn't look necessary, but we should probably make sure this is OK.
@arturaz Since this mostly refactors the code that you recently wrote, I think it is only fair to ask you if you think if this change is an improvement or not? And if you think it is an improvement in general, if you have any feedback?
I was somewhat worried about contention on a single AtomicCell holding all subscriptions (but it could be completely unfounded).
Yeah, this needs to be benchmarked. We could be complicating the code with no real benefit.
The unsubscribe and subscription cleanup effects aren't guarded by the mutex anymore. It didn't look necessary, but we should probably make sure this is OK.
What about this scenario? Last subscriber is unsubscribing. It is removed from AtomicCell and then proceeds to run cleanup. However, Cats Effect scheduler decides to yield between modify and cleanup to other fiber, which looks into AtomicCell, takes the lock, sees that there's no subscribers, runs subscription against redis client. Then previous fiber resumes, performing the cleanup, nuking that subscription?
I also think the debug logs should be left there, they are useful when, well, debugging :)
Thanks for taking a look, @arturaz!
Yeah, this needs to be benchmarked. We could be complicating the code with no real benefit.
I agree I should have done some testing before.
A simple test opening and closing some subscriptions in parallel, does already show the contention. I'll see if I can spend some more time creating some better benchmarks.
test("subscribe and unsubscribe ") {
import cats.syntax.all._
withRedisPubSub { pubSub =>
val channels = List.range(1, 100).map(_ % 25).map(n => RedisChannel(n.toString))
val expectedSubscriptions = channels.length.toLong
val checkSubscriptionCount: IO[Long] = pubSub.internalChannelSubscriptions.map(_.values.sum)
val wait = checkSubscriptionCount
.delayBy(50.millis)
.iterateUntil(_ == expectedSubscriptions)
val subscribe = channels.parTraverse { channel =>
pubSub.subscribe(channel).compile.drain
}
val unsubscribe = channels.parTraverse(pubSub.unsubscribe)
IO.both(subscribe, wait >> unsubscribe).timed.flatMap { case (d, _) => IO.println(d) }
}
}
Switching to use PubSubState.make[F, K, V](shards = Some(2)) in mkPubSubConnection, seems to be more than 30% faster.
It would be interesting to compare that against 1.7.2 (eventhough that may also show the bugs you fixed).
I also think the debug logs should be left there, they are useful when, well, debugging :)
I can bring those back. I may try to see if we can add them back while keeping them outside of evalUpdate and evalModify.
Is there any downside in only having the sharded implementation?
Is there any downside in only having the sharded implementation?
Probably not.
Thinking about this more, I think we can still make a Ref based implementation work by representing the subscription start up and clean up as states in our map. We could then make operations for the same channel/pattern wait until the subscription is ready or cleaned up. I pushed a draft version of such an implementation, but i needs some more work on the effectful state changes from Starting to Active and from ShuttingDown to removal (to handle failure, invalid states in the second atomic state change, ...). This implementation seems to be around three times faster as the single AtomicCell implementation for the simple test case I shared above.
I pushed a draft version of such an implementation, but i needs some more work on the effectful state changes from
StartingtoActiveand fromShuttingDownto removal (to handle failure, invalid states in the second atomic state change, ...).
So I should delay my review until you finish, right?
I opened https://github.com/profunktor/redis4cats/pull/984 to replace this PR (but I'll leave this one open for now).