fs2
fs2 copied to clipboard
parEvalMap messes up unmasking of cancellation
Reproduction (scastie):
import cats.effect.std.Queue
import cats.effect.IO
import cats.effect.testkit.TestControl
import cats.effect.unsafe.implicits.global
import fs2.Stream
import cats.syntax.all._
import scala.concurrent.duration._
var parallelism = 1
val queue = Queue.unbounded[IO, Int].unsafeRunSync()
val drainQueue = queue.tryTakeN(None).flatMap(_.traverse_(i => IO.println(s"cleaning up leftover $i")))
val run = IO.uncancelable( poll =>
Stream.repeatEval(poll(queue.take))
.parEvalMap(parallelism){ i =>
poll(IO.println(s"processing: $i"))
.onCancel(IO.println(s"cleaning up: $i"))
}
.compile
.drain
.guarantee(drainQueue)
)
val program = TestControl.executeEmbed(run.background.surround(List(1,2,3,4,5).traverse_(i => queue.offer(i).delayBy(500.millis))))
program.unsafeRunSync()
println()
parallelism = 2
program.unsafeRunSync()
Output:
processing: 1
processing: 2
processing: 3
processing: 4
cleaning up leftover 5
processing: 1
processing: 2
processing: 3
processing: 4
processing: 5
cats.effect.testkit.TestControl$NonTerminationException: Program under test failed produce a result ...
Not sure if it's related to other parEvalMap issues such as #3076.
It's probably a more general IO <-> Stream interop thing.