fs2
fs2 copied to clipboard
`Pull#lease`, `Pull.bracketCase` and `Pull.extendScopeTo` are not cancellation safe.
Pull.extendScopeTo here
A safer variant could be:
object Pull {
...
def leaseScope[F[_]: MonadThrow]: Pull[F, Nothing, Resource[F, Unit]] =
Pull.getScope[F].map(s => Resource.make(s.lease)(_.cancel.rethrow))
}
The code that can produce the error.
import cats.effect.*
import cats.effect.unsafe.implicits.global
import fs2.*
import scala.concurrent.duration.*
import cats.implicits.*
import cats.*
object Main extends IOApp.Simple {
def run = IO.deferred[Unit].flatMap { d =>
IO.ref(0).flatMap { ref =>
val s =
Stream
.resource(Resource.make(ref.update(_ + 1))(_ => ref.update(_ - 1)))
.pull
.uncons1
.flatMap {
case None => Pull.done
case Some((hd, tl)) =>
Pull.output1(hd) >>
Pull.extendScopeTo(Stream.unit.covary[IO]).flatMap {
killLease =>
Pull.eval(d.complete(()).void >> IO.never[Unit]) >>
Pull.eval(killLease.compile.drain) >>
tl.pull.echo
}
}
.streamNoScope
IO.race(s.compile.drain, d.get) >> ref.get.flatMap(IO.println(_))
}
}
}
Pull.bracketCase can be cancelled after the acquire and during use.
I don't know what uses Pull.bracketCase has, other than Pull#lease, but I think it should at least be documented that it shouldn't be used for resources, since release is not guaranteed.
Pull#lease is subject to the issues of bracketCase and can cause resources on the scope to never be released.
I think it could be implemented in a safe way using the new leaseScope introduced above.
sealed abstract class Pull[...] {
...
// binary incomat with `lease`
def leaseSafe(implicit F: MonadCancel[F, _]): Pull[...] =
Pull.leaseScope.flatMap(r => Stream.resourceWeak(r).pull.echo >> this)
}
Will make the PR, just wanted a discussion first.