cats-effect
cats-effect copied to clipboard
Cancelables
Hi all,
Hotswap is a useful feature, but it's only one use-case of a set of patterns involving resource allocation and handling.
In particular, Hotswap is an implementation (that may be too high-level) of a SerialDisposable from RxJava, or SerialCancelable from Monix. And there are other patterns that may be useful, and I can think of these …
SingleAssignCancelable, which is a cancelable reference that can only be set once. This is super useful in case you want to refer to a cancelable token of a resource that wasn't initialized yet. With it, you can build, for example, a Resource builder that does not wait for the acquisition of the underlying resource. E.g., you can easily build something like:
F[(A, F[Unit])] => Resource[F, F[A]]
(and I'd also propose the inclusion of something like this in Cats-Effect)
CompositeCancelable, which allows for composing multiple resources that can be disposed all at once. For example, I feel the need of getting rid of Resource chains from my main initialization logic, something similar to Scala's Using.Manager.
RefCountCancelable which allows for the sharing of a resource, only disposing it once all consumers are done with it.
I want to contribute them somewhere. Monix's execution/catnap subprojects have been a natural destination, but I no longer want to work on Cats-Effect utils that may get superseded by implementations in Cats-Effect proper. I'd like to contribute something like this to Cats-Effect. In particular, I really feel the need for SingleAssignCancelable.
Do you think these would be useful in std? I already have a working implementation, which I use in our project at $work, for SingleAssignCancelable, to go alongside that Resource[F, F[A]] utility. Would you like to see it on a PoC PR?
At a glance:
-
SingleAssignCancelablesounds likeResource#memoize, see also https://github.com/typelevel/cats-effect/issues/3513. -
CompositeCancelablesounds likeResource#productor*>or any other of the usual combinators, are those not suitable? -
RefCountCancelablesounds like something I discussed with @TimWSpence once and we concluded it didn't make much sense. Discord discussion is here. However it did give me the idea for https://github.com/typelevel/cats-effect/issues/3376.
SingleAssignCancelable isn't necessarily about Resource[F, F[A]], but indeed, it's the use-case I needed it for.
I haven't noticed Resource#memoize for some reason 🙂 although now I wonder if it has the correct semantics, since initialization isn't the use-case I want it for, being just a side effect. It probably works well, though. The primary use-case for me is to not block the main thread on the initialization of a resource (which may even contain retry logic in case of exceptions), which is why Resource[F, F[A]] is the far more natural signature.
Note that to make it work, you're first keeping a List[CancelToken] state, with Cats-Effect's semantics on IO.uncancelable possibly preventing race conditions on cancelation (release.get.flatMap(_.foldMapM(_(exit)))), although that looks risky nonetheless. And then IO#memoize itself has quite the internal state machine, and this isn't for normal folks to manage:
final case class Unevaluated[F[_], E, A]() extends Memoize[F, E, A]
final case class Evaluating[F[_], E, A](
fiber: Deferred[F, Fiber[F, E, A]],
subscribers: Long
) extends Memoize[F, E, A]
final case class Finished[F[_], E, A](result: Either[E, F[A]]) extends Memoize[F, E, A]
Providing a SingleAssignCancelable gives normal folks the tools to build such logic by themselves. My logic looks like this, and note how I'm not using IO#memoize, or my state kept in a Ref, just plain old IO#start:
def nonBlockingResource[A](builder: Poll[IO] => IO[(A, IO[Unit])]): Resource[IO, IO[A]] =
Resource.apply {
// If `awaitInitialization` would be `true`, then on `cancel` this
// reference would wait for assignment to happen, but that would
// not work well due to the race condition of the fiber being canceled.
SingleAssignCancelable(awaitInitialization = false).flatMap { cancelableRef =>
val acquisitionTask = IO.uncancelable { poll =>
builder(poll).flatMap {
case (a, cancelToken) =>
cancelableRef.set(cancelToken).as(a)
}
}
acquisitionTask.start.map { fiber =>
val resCancelDuo = List(
fiber.cancel,
cancelableRef.cancel,
)
val join = fiber.joinWith(
IO.raiseError(new CancellationException)
)
(join, resCancelDuo.sequence_)
}
}
}
Note, I may have made mistakes above, but that would only highlight even more the necessity to provide lower-level, but safer tools for building logic like this.
The primary use-case for me is to not block the main thread on the initialization of a resource (which may even contain retry logic in case of exceptions), which is why
Resource[F, F[A]]is the far more natural signature.
You may also be interested in Resource#background then, which acquires a resource in the background. Unfortunately these methods have rather hairy type signatures due to their highly general definitions on GenConcurrent, which is precisely why I opened https://github.com/typelevel/cats-effect/issues/3513. That's something we can definitely fix 🙂
Providing a
SingleAssignCancelablegives normal folks the tools to build such logic by themselves.
Sounds interesting, but I haven't fully grokked it yet exactly 🤔
On SingleAssignCancelable, I have something like this in my code:
import cats.effect.IO
import cats.effect.kernel.Deferred
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec
final class SingleAssignCancelable private (
ref: AtomicReference[SingleAssignCancelable.State],
awaitInitialization: Boolean,
) {
import SingleAssignCancelable.State
def set(token: IO[Unit]): IO[Unit] = {
@tailrec
def loop(deferred: Option[Deferred[IO, Unit]]): IO[Unit] =
ref.get() match {
case current @ State.Uninitialized =>
if (ref.compareAndSet(current, State.Active(token)))
IO.unit
else
loop(deferred)
case State.AwaitCancellation(p) =>
triggerCancel(token, p)
case State.Active(_) | State.Cancelled =>
// NOTE: Seeing `Cancelled` means that this reference was already set;
// This is a protocol violation in both cases.
IO.raiseError(
new IllegalStateException("SingleAssignCancelable is already set")
)
}
IO.defer(loop(None))
}
def cancel: IO[Unit] = {
@tailrec
def loop(deferred: Option[Deferred[IO, Unit]]): IO[Unit] =
ref.get() match {
case current @ State.Uninitialized =>
val p = deferred.getOrElse(Deferred.unsafe[IO, Unit])
if (ref.compareAndSet(current, State.AwaitCancellation(p))) {
if (awaitInitialization)
p.get
else
IO.unit
} else {
loop(Some(p))
}
case current @ State.Active(cancel) =>
val p = deferred.getOrElse(Deferred.unsafe[IO, Unit])
if (ref.compareAndSet(current, State.AwaitCancellation(p)))
triggerCancel(cancel, p)
else
loop(Some(p))
case State.AwaitCancellation(p) =>
if (awaitInitialization)
p.get
else
IO.unit
case State.Cancelled =>
IO.unit
}
IO.defer(loop(None))
}
private def triggerCancel(cancel: IO[Unit], p: Deferred[IO, Unit]): IO[Unit] = {
// First starts the cancellation process, asynchronously,
// then waits for it to complete indirectly, via the promise;
// This is to ensure that the cancellation goes through,
// but the client can still unsubscribe from the shared deferred;
cancel
.flatMap(_ => p.complete(()).void)
// Optimization, as there's no longer a reason to keep the Deferred around
.flatMap(_ => IO(ref.set(State.Cancelled)))
// Start and forget
.start
// We need atomicity for the above flatMaps
.uncancelable
// Wait on the promise (gives a chance to listeners to unsubscribe)
.flatMap(_ => p.get)
}
}
object SingleAssignCancelable {
/**
* @param awaitInitialization if `true`, then evaluating the returned `IO`
* will also wait for the initialization of the resource to complete
* (i.e., for [[SingleAssignCancelable.set]] to be called). This is
* useful to ensure that any underlying resource gets cancelled
* before the evaluated `IO` returns; otherwise the resource might
* be leaked.
*/
def apply(awaitInitialization: Boolean): IO[SingleAssignCancelable] =
IO(unsafe(awaitInitialization))
def unsafe(awaitInitialization: Boolean): SingleAssignCancelable =
new SingleAssignCancelable(
new AtomicReference(State.Uninitialized),
awaitInitialization,
)
private sealed trait State extends Product with Serializable
private object State {
final case object Uninitialized extends State
final case class Active(cancel: IO[Unit]) extends State
final case class AwaitCancellation(p: Deferred[IO, Unit]) extends State
final case object Cancelled extends State
}
}
@armanbilge
CompositeCancelable sounds like Resource#product or *> or any other of the usual combinators, are those not suitable?
A CompositeCancelable would be more dynamic.
As mentioned, I'm thinking of something like a Using.Manager. Does Resource have anything like it, by any chance? It would be really cool to clean up some of our initialization logic in main.
class ResourceManager[F[_]](...) {
def use[A](res: Resource[F, A]): F[A]
}
A
CompositeCancelablewould be more dynamic.
Ah, thanks for that clarification! That looks like my proposal in https://github.com/typelevel/cats-effect/issues/3376 I think.
You may also be interested in
Resource#backgroundthen, which acquires a resource in the background. Unfortunately these methods have rather hairy type signatures
The one of Resource#background is so hairy, my barber ran away screaming.
Still sorting through some of this, but there's a bit of prior thinking on this which dates all the way back to pre-CE3 days. #1345