cats-effect icon indicating copy to clipboard operation
cats-effect copied to clipboard

Cancelables

Open alexandru opened this issue 2 years ago • 8 comments

Hi all,

Hotswap is a useful feature, but it's only one use-case of a set of patterns involving resource allocation and handling.

In particular, Hotswap is an implementation (that may be too high-level) of a SerialDisposable from RxJava, or SerialCancelable from Monix. And there are other patterns that may be useful, and I can think of these …

SingleAssignCancelable, which is a cancelable reference that can only be set once. This is super useful in case you want to refer to a cancelable token of a resource that wasn't initialized yet. With it, you can build, for example, a Resource builder that does not wait for the acquisition of the underlying resource. E.g., you can easily build something like:

F[(A, F[Unit])] => Resource[F, F[A]]

(and I'd also propose the inclusion of something like this in Cats-Effect)

CompositeCancelable, which allows for composing multiple resources that can be disposed all at once. For example, I feel the need of getting rid of Resource chains from my main initialization logic, something similar to Scala's Using.Manager.

RefCountCancelable which allows for the sharing of a resource, only disposing it once all consumers are done with it.

I want to contribute them somewhere. Monix's execution/catnap subprojects have been a natural destination, but I no longer want to work on Cats-Effect utils that may get superseded by implementations in Cats-Effect proper. I'd like to contribute something like this to Cats-Effect. In particular, I really feel the need for SingleAssignCancelable.

Do you think these would be useful in std? I already have a working implementation, which I use in our project at $work, for SingleAssignCancelable, to go alongside that Resource[F, F[A]] utility. Would you like to see it on a PoC PR?

alexandru avatar Apr 11 '23 05:04 alexandru

At a glance:

  • SingleAssignCancelable sounds like Resource#memoize, see also https://github.com/typelevel/cats-effect/issues/3513.

  • CompositeCancelable sounds like Resource#product or *> or any other of the usual combinators, are those not suitable?

  • RefCountCancelable sounds like something I discussed with @TimWSpence once and we concluded it didn't make much sense. Discord discussion is here. However it did give me the idea for https://github.com/typelevel/cats-effect/issues/3376.

armanbilge avatar Apr 11 '23 05:04 armanbilge

SingleAssignCancelable isn't necessarily about Resource[F, F[A]], but indeed, it's the use-case I needed it for.

I haven't noticed Resource#memoize for some reason 🙂 although now I wonder if it has the correct semantics, since initialization isn't the use-case I want it for, being just a side effect. It probably works well, though. The primary use-case for me is to not block the main thread on the initialization of a resource (which may even contain retry logic in case of exceptions), which is why Resource[F, F[A]] is the far more natural signature.

Note that to make it work, you're first keeping a List[CancelToken] state, with Cats-Effect's semantics on IO.uncancelable possibly preventing race conditions on cancelation (release.get.flatMap(_.foldMapM(_(exit)))), although that looks risky nonetheless. And then IO#memoize itself has quite the internal state machine, and this isn't for normal folks to manage:

    final case class Unevaluated[F[_], E, A]() extends Memoize[F, E, A]
    final case class Evaluating[F[_], E, A](
        fiber: Deferred[F, Fiber[F, E, A]],
        subscribers: Long
    ) extends Memoize[F, E, A]
    final case class Finished[F[_], E, A](result: Either[E, F[A]]) extends Memoize[F, E, A]

Providing a SingleAssignCancelable gives normal folks the tools to build such logic by themselves. My logic looks like this, and note how I'm not using IO#memoize, or my state kept in a Ref, just plain old IO#start:

def nonBlockingResource[A](builder: Poll[IO] => IO[(A, IO[Unit])]): Resource[IO, IO[A]] =
  Resource.apply {
    // If `awaitInitialization` would be `true`, then on `cancel` this 
    // reference would wait for assignment to happen, but that would
    // not work well due to the race condition of the fiber being canceled. 
    SingleAssignCancelable(awaitInitialization = false).flatMap { cancelableRef =>
      val acquisitionTask = IO.uncancelable { poll =>
        builder(poll).flatMap {
          case (a, cancelToken) =>
            cancelableRef.set(cancelToken).as(a)
        }
      }
      acquisitionTask.start.map { fiber =>
        val resCancelDuo = List(
          fiber.cancel,
          cancelableRef.cancel,
        )
        val join = fiber.joinWith(
          IO.raiseError(new CancellationException)
        )
        (join, resCancelDuo.sequence_)
      }
    }
  }

Note, I may have made mistakes above, but that would only highlight even more the necessity to provide lower-level, but safer tools for building logic like this.

alexandru avatar Apr 11 '23 06:04 alexandru

The primary use-case for me is to not block the main thread on the initialization of a resource (which may even contain retry logic in case of exceptions), which is why Resource[F, F[A]] is the far more natural signature.

You may also be interested in Resource#background then, which acquires a resource in the background. Unfortunately these methods have rather hairy type signatures due to their highly general definitions on GenConcurrent, which is precisely why I opened https://github.com/typelevel/cats-effect/issues/3513. That's something we can definitely fix 🙂


Providing a SingleAssignCancelable gives normal folks the tools to build such logic by themselves.

Sounds interesting, but I haven't fully grokked it yet exactly 🤔

armanbilge avatar Apr 11 '23 06:04 armanbilge

On SingleAssignCancelable, I have something like this in my code:

import cats.effect.IO
import cats.effect.kernel.Deferred
import java.util.concurrent.atomic.AtomicReference
import scala.annotation.tailrec

final class SingleAssignCancelable private (
  ref: AtomicReference[SingleAssignCancelable.State],
  awaitInitialization: Boolean,
) {
  import SingleAssignCancelable.State
  
  def set(token: IO[Unit]): IO[Unit] = {
    @tailrec
    def loop(deferred: Option[Deferred[IO, Unit]]): IO[Unit] =
      ref.get() match {
        case current @ State.Uninitialized =>
          if (ref.compareAndSet(current, State.Active(token)))
            IO.unit
          else
            loop(deferred)

        case State.AwaitCancellation(p) =>
          triggerCancel(token, p)

        case State.Active(_) | State.Cancelled =>
          // NOTE: Seeing `Cancelled` means that this reference was already set;
          // This is a protocol violation in both cases.
          IO.raiseError(
            new IllegalStateException("SingleAssignCancelable is already set")
          )
      }
    IO.defer(loop(None))
  }

  def cancel: IO[Unit] = {
    @tailrec
    def loop(deferred: Option[Deferred[IO, Unit]]): IO[Unit] =
      ref.get() match {
        case current @ State.Uninitialized =>
          val p = deferred.getOrElse(Deferred.unsafe[IO, Unit])
          if (ref.compareAndSet(current, State.AwaitCancellation(p))) {
            if (awaitInitialization)
              p.get
            else
              IO.unit
          } else {
            loop(Some(p))
          }
        case current @ State.Active(cancel) =>
          val p = deferred.getOrElse(Deferred.unsafe[IO, Unit])
          if (ref.compareAndSet(current, State.AwaitCancellation(p)))
            triggerCancel(cancel, p)
          else
            loop(Some(p))

        case State.AwaitCancellation(p) =>
          if (awaitInitialization)
            p.get
          else
            IO.unit

        case State.Cancelled =>
          IO.unit
      }
    IO.defer(loop(None))
  }

  private def triggerCancel(cancel: IO[Unit], p: Deferred[IO, Unit]): IO[Unit] = {
    // First starts the cancellation process, asynchronously,
    // then waits for it to complete indirectly, via the promise;
    // This is to ensure that the cancellation goes through,
    // but the client can still unsubscribe from the shared deferred;
    cancel
      .flatMap(_ => p.complete(()).void)
      // Optimization, as there's no longer a reason to keep the Deferred around
      .flatMap(_ => IO(ref.set(State.Cancelled)))
      // Start and forget
      .start
      // We need atomicity for the above flatMaps
      .uncancelable
      // Wait on the promise (gives a chance to listeners to unsubscribe)
      .flatMap(_ => p.get)
  }
}

object SingleAssignCancelable {
  /**
    * @param awaitInitialization if `true`, then evaluating the returned `IO`
    *        will also wait for the initialization of the resource to complete
    *        (i.e., for [[SingleAssignCancelable.set]] to be called). This is
    *        useful to ensure that any underlying resource gets cancelled
    *        before the evaluated `IO` returns; otherwise the resource might
    *        be leaked.
    */
  def apply(awaitInitialization: Boolean): IO[SingleAssignCancelable] =
    IO(unsafe(awaitInitialization))

  def unsafe(awaitInitialization: Boolean): SingleAssignCancelable =
    new SingleAssignCancelable(
      new AtomicReference(State.Uninitialized),
      awaitInitialization,
    )

  private sealed trait State extends Product with Serializable
  private object State {
    final case object Uninitialized extends State
    final case class Active(cancel: IO[Unit]) extends State
    final case class AwaitCancellation(p: Deferred[IO, Unit]) extends State
    final case object Cancelled extends State
  }
}

alexandru avatar Apr 11 '23 06:04 alexandru

@armanbilge

CompositeCancelable sounds like Resource#product or *> or any other of the usual combinators, are those not suitable?

A CompositeCancelable would be more dynamic.

As mentioned, I'm thinking of something like a Using.Manager. Does Resource have anything like it, by any chance? It would be really cool to clean up some of our initialization logic in main.

class ResourceManager[F[_]](...) {

  def use[A](res: Resource[F, A]): F[A]
}

alexandru avatar Apr 11 '23 08:04 alexandru

A CompositeCancelable would be more dynamic.

Ah, thanks for that clarification! That looks like my proposal in https://github.com/typelevel/cats-effect/issues/3376 I think.

armanbilge avatar Apr 11 '23 08:04 armanbilge

You may also be interested in Resource#background then, which acquires a resource in the background. Unfortunately these methods have rather hairy type signatures

The one of Resource#background is so hairy, my barber ran away screaming.

Jasper-M avatar Apr 14 '23 10:04 Jasper-M

Still sorting through some of this, but there's a bit of prior thinking on this which dates all the way back to pre-CE3 days. #1345

djspiewak avatar Apr 15 '23 00:04 djspiewak