fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Subscribing to Topic concurrently to closing the Topic leads to non-terminating subscription stream

Open TomasMikula opened this issue 1 month ago • 0 comments

When materialization of a Stream returned from Topic#subscribe or subscribeAwait is done concurrently with closing the Topic, the stream returned from the subscription is (sometimes) not terminated, which is a resource leak.

Reproduction

fs2 version: 3.12.2 cats-effect version: 3.6.3 Scala version: 2.13.18

import cats.effect.{ExitCode, IO, IOApp}
import fs2.Stream
import fs2.concurrent.Topic

import scala.concurrent.duration.DurationInt

object TopicTest extends IOApp {

  def test(await: Boolean): IO[Unit] =
    for {
      t <- Topic[IO, Int]
      s =
        if (await) Stream.resource(t.subscribeAwait(maxQueued = 1)).flatten
        else t.subscribe(maxQueued = 1)
      fiber <- s.compile.toList.start // let the subscription race with closing
      _ <- t.close
      _ <- fiber.join.timeout(10.seconds) // times out (sometimes), meaning the subscription stream never terminates
    } yield ()

  override def run(args: List[String]): IO[ExitCode] =
    (test(await = false) *> test(await = true))
      .replicateA_(10000)
      .as(ExitCode.Success)

}

TomasMikula avatar Dec 08 '25 20:12 TomasMikula