redis4cats icon indicating copy to clipboard operation
redis4cats copied to clipboard

Make `PubSubState` a trait

Open peterneyens opened this issue 10 months ago • 8 comments

An attempt to make it easier to change the PubSubState implementation or add multiple implementations. This is somewhat of a follow up of https://github.com/profunktor/redis4cats/pull/966.

For some context: I am exploring using keyspace notifications and that led me to look at the pubsub code. My usecase would have a lot of short lived subscriptions, which is probably not the most common pubsub usage. Without any real tests or digging deeper into the lettuce code, I was somewhat worried about contention on a single AtomicCell holding all subscriptions (but it could be completely unfounded).

I hope that the change in this PR makes it easier to make the PubSubState more configurable. I added a sharded PubSubState implementation that is currently unused and only shows how we could support different implementations (but it seems a useful implementation to have).

  • This PR does move a lot of the Subscriber code into PubSubState. The first one now mostly contains how to create a subscription, while the second one now maintains (and hides) the subscription lifecycle.
  • I did leave out some of the logs added in #966, I can bring them back if desired, but I wanted to get some feedback first.
  • The unsubscribe and subscription cleanup effects aren't guarded by the mutex anymore. It didn't look necessary, but we should probably make sure this is OK.

peterneyens avatar Feb 26 '25 10:02 peterneyens

@arturaz Since this mostly refactors the code that you recently wrote, I think it is only fair to ask you if you think if this change is an improvement or not? And if you think it is an improvement in general, if you have any feedback?

peterneyens avatar Feb 26 '25 11:02 peterneyens

I was somewhat worried about contention on a single AtomicCell holding all subscriptions (but it could be completely unfounded).

Yeah, this needs to be benchmarked. We could be complicating the code with no real benefit.

The unsubscribe and subscription cleanup effects aren't guarded by the mutex anymore. It didn't look necessary, but we should probably make sure this is OK.

What about this scenario? Last subscriber is unsubscribing. It is removed from AtomicCell and then proceeds to run cleanup. However, Cats Effect scheduler decides to yield between modify and cleanup to other fiber, which looks into AtomicCell, takes the lock, sees that there's no subscribers, runs subscription against redis client. Then previous fiber resumes, performing the cleanup, nuking that subscription?

arturaz avatar Feb 26 '25 18:02 arturaz

I also think the debug logs should be left there, they are useful when, well, debugging :)

arturaz avatar Feb 26 '25 21:02 arturaz

Thanks for taking a look, @arturaz!

Yeah, this needs to be benchmarked. We could be complicating the code with no real benefit.

I agree I should have done some testing before.

A simple test opening and closing some subscriptions in parallel, does already show the contention. I'll see if I can spend some more time creating some better benchmarks.

test("subscribe and unsubscribe ") {
  import cats.syntax.all._
  withRedisPubSub { pubSub =>
    val channels              = List.range(1, 100).map(_ % 25).map(n => RedisChannel(n.toString))
    val expectedSubscriptions = channels.length.toLong

    val checkSubscriptionCount: IO[Long] = pubSub.internalChannelSubscriptions.map(_.values.sum)
    val wait = checkSubscriptionCount
      .delayBy(50.millis)
      .iterateUntil(_ == expectedSubscriptions)

    val subscribe = channels.parTraverse { channel =>
      pubSub.subscribe(channel).compile.drain
    }
    val unsubscribe = channels.parTraverse(pubSub.unsubscribe)

    IO.both(subscribe, wait >> unsubscribe).timed.flatMap { case (d, _) => IO.println(d) }
  }
} 

Switching to use PubSubState.make[F, K, V](shards = Some(2)) in mkPubSubConnection, seems to be more than 30% faster. It would be interesting to compare that against 1.7.2 (eventhough that may also show the bugs you fixed).

I also think the debug logs should be left there, they are useful when, well, debugging :)

I can bring those back. I may try to see if we can add them back while keeping them outside of evalUpdate and evalModify.

peterneyens avatar Feb 26 '25 21:02 peterneyens

Is there any downside in only having the sharded implementation?

arturaz avatar Feb 27 '25 07:02 arturaz

Is there any downside in only having the sharded implementation?

Probably not.

Thinking about this more, I think we can still make a Ref based implementation work by representing the subscription start up and clean up as states in our map. We could then make operations for the same channel/pattern wait until the subscription is ready or cleaned up. I pushed a draft version of such an implementation, but i needs some more work on the effectful state changes from Starting to Active and from ShuttingDown to removal (to handle failure, invalid states in the second atomic state change, ...). This implementation seems to be around three times faster as the single AtomicCell implementation for the simple test case I shared above.

peterneyens avatar Feb 27 '25 10:02 peterneyens

I pushed a draft version of such an implementation, but i needs some more work on the effectful state changes from Starting to Active and from ShuttingDown to removal (to handle failure, invalid states in the second atomic state change, ...).

So I should delay my review until you finish, right?

arturaz avatar Feb 27 '25 16:02 arturaz

I opened https://github.com/profunktor/redis4cats/pull/984 to replace this PR (but I'll leave this one open for now).

peterneyens avatar Mar 10 '25 12:03 peterneyens