fs2 icon indicating copy to clipboard operation
fs2 copied to clipboard

Incorrect termination of a cancelled stream

Open iRevive opened this issue 4 years ago • 8 comments
trafficstars

fs2 version: 2.4.6

I'm running N tasks (the task is uncancelable) in parallel and fs2 does not wait for the completion of the tasks after interruption.

Since the task is uncancelable I'm expecting the following behavior:

  1. Interurruption signal appeared
  2. Stop peeking pending elements from the inner streams
  3. Complete processing of all elements that were peeked before the interruption signal appeared
  4. Discard extra results (after post-interruption signal)

In fact, the post-termination in-flight tasks are not chained properly into the cancelation flow and being completed somewhere in the thread pool. Therefore, acquired resources being released earlier than they should.

Expected behavior: since the tasks are uncancelable, the cancelation needs to wait until the in-flight tasks are done.

The code scastie:

import cats.effect.{ExitCase, ExitCode, Resource, IO, IOApp}
import cats.effect.concurrent.Ref
import cats.syntax.functor._
import cats.syntax.flatMap._
import fs2.Stream

import scala.concurrent.duration._

object Main extends IOApp {

  def run(args: List[String]): IO[ExitCode] = {

    def log(msg: String) =
      IO.delay(println(msg))

    def job(idx: Int, isOpenRef: Ref[IO, Boolean]): IO[Int] = {
      val io = 
        for {
          isOpen <- isOpenRef.get
          _      <- log(s"Started $idx. $isOpen")
          _      <- IO.sleep(100.millis)
        } yield idx
  
      io
        .guaranteeCase {
          case ExitCase.Completed => isOpenRef.get.flatMap(isOpen => log(s"Completed $idx. $isOpen"))
          case ExitCase.Error(e)  => isOpenRef.get.flatMap(isOpen => log(s"Error $idx $e. $isOpen"))
          case ExitCase.Canceled  => isOpenRef.get.flatMap(isOpen => log(s"Cancelled $idx. $isOpen"))
        }
        .uncancelable
    }
    
    val isOpenResource = Resource.make(Ref.of[IO, Boolean](true))(bool => bool.set(false))

   
    val process = isOpenResource.use { isOpenRef =>
      Stream.range(1, 100).covary[IO]
      .parEvalMapUnordered(10)(idx => job(idx, isOpenRef))
      .takeWhile(result => result <= 50, takeFailure = true)
      .takeRight(1) // getting the first value (which finishes the stream)
      .compile
      .last
      .flatMap(lastOpt => isOpenRef.get.flatMap(isOpen => log(s"Last $lastOpt. $isOpen")).as(ExitCode.Success))
    }
    
    
    process.start >> IO.never
  }
  
}

The output:

Completed 60. true
Started 62. true
Last Some(51). true
Completed 61. false <- the resource was released, but there are still some in-flight tasks in progress
Completed 62. false

The expected output:

Completed 60. true
Started 62. true
Completed 61. true
Completed 62. true
Last Some(51). true

fs2 3.0.0-M6 version works correctly https://scastie.scala-lang.org/HQq6v4ESSnGWY4KI2W3VxA.

iRevive avatar Dec 28 '20 15:12 iRevive

Unfortunately, I hit the issue again on the 3.x series.

cats-effect version: 3.2.9 fs2 version: 3.2.1

Updated scastie: https://scastie.scala-lang.org/ilsZZTXWQpa07JbJrVCSaw

Updated example:

import cats.effect.{Resource, Ref, IO, IOApp, Outcome}
import fs2.Stream

import scala.concurrent.duration._

object Main extends IOApp.Simple {

  def run: IO[Unit] = {

    def job(idx: Int, isOpenRef: Ref[IO, Boolean]): IO[Int] = {
      val io =
        for {
          isOpen <- isOpenRef.get
          _      <- IO.println(s"Started $idx. $isOpen")
          _      <- IO.sleep(100.millis)
        } yield idx

      io
        .guaranteeCase {
          case Outcome.Succeeded(_) => isOpenRef.get.flatMap(isOpen => IO.println(s"Completed $idx. $isOpen"))
          case Outcome.Errored(e)   => isOpenRef.get.flatMap(isOpen => IO.println(s"Error $idx $e. $isOpen"))
          case Outcome.Canceled()   => isOpenRef.get.flatMap(isOpen => IO.println(s"Cancelled $idx. $isOpen"))
        }
        .uncancelable
    }

    val isOpenResource = Resource.make(Ref.of[IO, Boolean](true))(bool => bool.set(false))

    val process = isOpenResource.use { isOpenRef =>
      Stream.range(1, 100).covary[IO]
        .parEvalMapUnordered(10)(idx => job(idx, isOpenRef))
        .takeWhile(result => result <= 50, takeFailure = true)
        .takeRight(1) // getting the first value (which finishes the stream)
        .compile
        .last
        .flatMap(lastOpt => isOpenRef.get.flatMap(isOpen => IO.println(s"Last $lastOpt. $isOpen")))
    }

    process.start >> IO.never
  }

}

Output:

Last Some(51). true
Completed 66. false 
Completed 67. false
Completed 68. false
Completed 69. false
Completed 70. false

iRevive avatar Nov 08 '21 17:11 iRevive

@iRevive, Has the issue been resolved in any version of fs2? In other words, was it fixed in some version and then broken in another version?

nikiforo avatar Nov 08 '21 18:11 nikiforo

@nikiforo works on 3.1.3, broken since 3.1.4

iRevive avatar Nov 08 '21 18:11 iRevive

@iRevive Oooops, I might have broken it. I'll take a look at your example tomorrow. UPD: I think I got your point, looking into the issue.

nikiforo avatar Nov 08 '21 19:11 nikiforo

@nikiforo awesome, thanks! Let me know if I can help with an investigation

iRevive avatar Nov 09 '21 07:11 iRevive

My current suspicion falls on https://github.com/typelevel/fs2/blob/v3.1.4/core/shared/src/main/scala/fs2/Stream.scala#L2118 IO.race launches both the job and the stopReading.get, waits for the FIRST action to complete. This construction is used to cancel all running computations when the stream gets cancelled. However, we should also wait until those computations outcome into cancel completed or error. In other words, some method that waits until the job is completed should be used instead of race. UPD: My explanation was wrong. Actually, the stream doesn't get cancelled, it succeeds. Additionally, the problem happens because I didn't think thoroughly about lazy nature of stream, I'll make a PR in a week.

nikiforo avatar Nov 09 '21 15:11 nikiforo

@iRevive, can you confirm that the issue is resolved in 3.2.3?

nikiforo avatar Dec 08 '21 09:12 nikiforo

@nikiforo I didn't observe it yet (and hopefully will not). Thank you for your time!

iRevive avatar Dec 08 '21 09:12 iRevive