fs2
fs2 copied to clipboard
Incorrect termination of a cancelled stream
fs2 version: 2.4.6
I'm running N tasks (the task is uncancelable) in parallel and fs2 does not wait for the completion of the tasks after interruption.
Since the task is uncancelable I'm expecting the following behavior:
- Interurruption signal appeared
- Stop peeking pending elements from the inner streams
- Complete processing of all elements that were peeked before the interruption signal appeared
- Discard extra results (after post-interruption signal)
In fact, the post-termination in-flight tasks are not chained properly into the cancelation flow and being completed somewhere in the thread pool. Therefore, acquired resources being released earlier than they should.
Expected behavior: since the tasks are uncancelable, the cancelation needs to wait until the in-flight tasks are done.
The code scastie:
import cats.effect.{ExitCase, ExitCode, Resource, IO, IOApp}
import cats.effect.concurrent.Ref
import cats.syntax.functor._
import cats.syntax.flatMap._
import fs2.Stream
import scala.concurrent.duration._
object Main extends IOApp {
def run(args: List[String]): IO[ExitCode] = {
def log(msg: String) =
IO.delay(println(msg))
def job(idx: Int, isOpenRef: Ref[IO, Boolean]): IO[Int] = {
val io =
for {
isOpen <- isOpenRef.get
_ <- log(s"Started $idx. $isOpen")
_ <- IO.sleep(100.millis)
} yield idx
io
.guaranteeCase {
case ExitCase.Completed => isOpenRef.get.flatMap(isOpen => log(s"Completed $idx. $isOpen"))
case ExitCase.Error(e) => isOpenRef.get.flatMap(isOpen => log(s"Error $idx $e. $isOpen"))
case ExitCase.Canceled => isOpenRef.get.flatMap(isOpen => log(s"Cancelled $idx. $isOpen"))
}
.uncancelable
}
val isOpenResource = Resource.make(Ref.of[IO, Boolean](true))(bool => bool.set(false))
val process = isOpenResource.use { isOpenRef =>
Stream.range(1, 100).covary[IO]
.parEvalMapUnordered(10)(idx => job(idx, isOpenRef))
.takeWhile(result => result <= 50, takeFailure = true)
.takeRight(1) // getting the first value (which finishes the stream)
.compile
.last
.flatMap(lastOpt => isOpenRef.get.flatMap(isOpen => log(s"Last $lastOpt. $isOpen")).as(ExitCode.Success))
}
process.start >> IO.never
}
}
The output:
Completed 60. true
Started 62. true
Last Some(51). true
Completed 61. false <- the resource was released, but there are still some in-flight tasks in progress
Completed 62. false
The expected output:
Completed 60. true
Started 62. true
Completed 61. true
Completed 62. true
Last Some(51). true
fs2 3.0.0-M6 version works correctly https://scastie.scala-lang.org/HQq6v4ESSnGWY4KI2W3VxA.
Unfortunately, I hit the issue again on the 3.x series.
cats-effect version: 3.2.9 fs2 version: 3.2.1
Updated scastie: https://scastie.scala-lang.org/ilsZZTXWQpa07JbJrVCSaw
Updated example:
import cats.effect.{Resource, Ref, IO, IOApp, Outcome}
import fs2.Stream
import scala.concurrent.duration._
object Main extends IOApp.Simple {
def run: IO[Unit] = {
def job(idx: Int, isOpenRef: Ref[IO, Boolean]): IO[Int] = {
val io =
for {
isOpen <- isOpenRef.get
_ <- IO.println(s"Started $idx. $isOpen")
_ <- IO.sleep(100.millis)
} yield idx
io
.guaranteeCase {
case Outcome.Succeeded(_) => isOpenRef.get.flatMap(isOpen => IO.println(s"Completed $idx. $isOpen"))
case Outcome.Errored(e) => isOpenRef.get.flatMap(isOpen => IO.println(s"Error $idx $e. $isOpen"))
case Outcome.Canceled() => isOpenRef.get.flatMap(isOpen => IO.println(s"Cancelled $idx. $isOpen"))
}
.uncancelable
}
val isOpenResource = Resource.make(Ref.of[IO, Boolean](true))(bool => bool.set(false))
val process = isOpenResource.use { isOpenRef =>
Stream.range(1, 100).covary[IO]
.parEvalMapUnordered(10)(idx => job(idx, isOpenRef))
.takeWhile(result => result <= 50, takeFailure = true)
.takeRight(1) // getting the first value (which finishes the stream)
.compile
.last
.flatMap(lastOpt => isOpenRef.get.flatMap(isOpen => IO.println(s"Last $lastOpt. $isOpen")))
}
process.start >> IO.never
}
}
Output:
Last Some(51). true
Completed 66. false
Completed 67. false
Completed 68. false
Completed 69. false
Completed 70. false
@iRevive, Has the issue been resolved in any version of fs2? In other words, was it fixed in some version and then broken in another version?
@nikiforo works on 3.1.3, broken since 3.1.4
@iRevive Oooops, I might have broken it. I'll take a look at your example tomorrow. UPD: I think I got your point, looking into the issue.
@nikiforo awesome, thanks! Let me know if I can help with an investigation
My current suspicion falls on https://github.com/typelevel/fs2/blob/v3.1.4/core/shared/src/main/scala/fs2/Stream.scala#L2118
IO.race launches both the job and the stopReading.get, waits for the FIRST action to complete.
This construction is used to cancel all running computations when the stream gets cancelled. However, we should also wait until those computations outcome into cancel completed or error. In other words, some method that waits until the job is completed should be used instead of race.
UPD: My explanation was wrong. Actually, the stream doesn't get cancelled, it succeeds. Additionally, the problem happens because I didn't think thoroughly about lazy nature of stream, I'll make a PR in a week.
@iRevive, can you confirm that the issue is resolved in 3.2.3?
@nikiforo I didn't observe it yet (and hopefully will not). Thank you for your time!