fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

parEvalMap messes up unmasking of cancellation

Open Jasper-M opened this issue 2 years ago • 0 comments

Reproduction (scastie):

import cats.effect.std.Queue
import cats.effect.IO
import cats.effect.testkit.TestControl
import cats.effect.unsafe.implicits.global
import fs2.Stream
import cats.syntax.all._
import scala.concurrent.duration._

var parallelism = 1

val queue = Queue.unbounded[IO, Int].unsafeRunSync()

val drainQueue = queue.tryTakeN(None).flatMap(_.traverse_(i => IO.println(s"cleaning up leftover $i")))

val run = IO.uncancelable( poll =>
  Stream.repeatEval(poll(queue.take))
    .parEvalMap(parallelism){ i =>
      poll(IO.println(s"processing: $i"))
        .onCancel(IO.println(s"cleaning up: $i"))
    }
    .compile
    .drain
    .guarantee(drainQueue)
)

val program = TestControl.executeEmbed(run.background.surround(List(1,2,3,4,5).traverse_(i => queue.offer(i).delayBy(500.millis))))


program.unsafeRunSync()
println()

parallelism = 2
program.unsafeRunSync()

Output:

processing: 1
processing: 2
processing: 3
processing: 4
cleaning up leftover 5

processing: 1
processing: 2
processing: 3
processing: 4
processing: 5
cats.effect.testkit.TestControl$NonTerminationException: Program under test failed produce a result ...

Not sure if it's related to other parEvalMap issues such as #3076. It's probably a more general IO <-> Stream interop thing.

Jasper-M avatar Sep 25 '23 11:09 Jasper-M