Topic#subscribe on closed topics has surprising (non-terminating) semantics
using fs2 3.0.6 and 3.1.0. Here's a scatsie. All code pasted in this issue is also in the scatsie.
We have as an example of subscriber to a topic subscribing after the topic gets closed:
def topicSubscribeClosedTopicNonTerminating(topic: Topic[IO, Int]): IO[Unit] = {
val subscriber1: Stream[IO, Unit] =
topic
.subscribe(maxQueued = 1)
.evalMap(i => Console[IO].println(s"subscriber 1 received: $i"))
for {
fib1 <- subscriber1.compile.drain.start
_ <- Console[IO].println(s"closing topic")
_ <- topic.close
_ <- fib1.join
} yield ()
}
This consistently ends up non-terminating, and it's a bit surprising given that other methods seem to signal operating on closed topics better. I can think of 3 possibilities here:
- write in the scaladoc of
subscribea warning about this behaviour - have the method return a
Stream.empty - have the method return a
Stream.raiseError(...)
Additionally, you'll see in the scatsie the methods topicAwaitInStream*, which could be included as an example in the scaladoc on how to avoid surprising semantics.
I'll happily tackle this myself if you can decide which fix you want for this.
Sorry, this completely flew under my radar, so I'm just looking at it