pekko icon indicating copy to clipboard operation
pekko copied to clipboard

Task monad based on `Materializer`

Open jypma opened this issue 9 months ago • 12 comments

In a pekko Java project I'm working on, I've been creating a Task class whose abstraction looks something like:

public abstract class Task<T> {
    public static <T> Task<T> of(Supplier<T> fn) {}
    public static <T> Task<T> connect(Source<A, ? extends KillSwitch> source, Sink<A,CompletionStage<T>> sink) {}
    /* ... */
    public abstract TaskControl<T> run(Materializer mat);
}
public class TaskControl<T> {
    public void cancel() { /* ... */ }
    public CompletableFuture<T> future() { /* ... */ }
}

Task is a description of running something, but also has a unified cancellation API (across futures and connected RunnableGraphs). It is inspired heavily from ZIO's API. A lot of combinators are written.

Would this be an interesting abstraction to have in Pekko itself? The fact that streams need to be materialized brings them close to a Task monad already, and in my experience having a unified API between running streams and futures is very valuable.

We'd of course add a nice Scala API as well (which should be trivial).

jypma avatar Mar 16 '25 13:03 jypma

If this is a standalone set of code that a user can use to plugin to their code as opposed to something that Pekko would need to wire into lots of places, then it is probably quite valuable.

We might be able to get this into Pekko 1.2.0 which hopefully should be out in a few months.

pjfanning avatar Mar 16 '25 13:03 pjfanning

Yupp, it basically builds on top of Pekko. Once in, we might add direct abilities in e.g. pekko-http to create an entity by marshalling a Task, but I think even that can start out external.

My current implementation is on top of VAVR, but that can be left behind (it abstracts over the future concept anyway). I'll see if I can create a PR.

jypma avatar Mar 16 '25 14:03 jypma

Work is ongoing. When dropping VAVR's Future, I noticed that scala's Future isn't cancellable, and cancellation effectiveness on CompletableFuture is...disputed :) I'm trying out a few alternatives, please stay tuned.

jypma avatar Mar 17 '25 15:03 jypma

Pekko scheduler creates a Cancellable instance when you schedule a task. You can schedule for immediate execution.

https://github.com/apache/pekko/blob/9b1f82319d830d1ecbc96bd7e8fdf3dda198681b/actor/src/main/scala/org/apache/pekko/actor/Scheduler.scala#L525

Maybe this is something that can be reused here?

pjfanning avatar Mar 17 '25 16:03 pjfanning

First shot at a direction is in https://github.com/apache/pekko/pull/1802 . The cancellation of schedulers might not be that relevant (yet). The callbacks for Task should really run directly on an Executor, because of speed and latency concerns.

But it might look like we, for a first attempt, can do without cancelling actual Excutor tasks. Let's see :).

jypma avatar Mar 18 '25 10:03 jypma

I have been thinking, maybe it makes sense to release this as part of Pekko 2.x? It might take a while for this to get finished, and if breaking changes are needed to make certain things work (such as resource cancellation) then this can be done for Pekko 2.x

mdedetrich avatar May 05 '25 14:05 mdedetrich

I'm OK with any timeline that you deem fit the project best. On the topic of cancellation (or, more generically, resource safety), we don't necessarily need to wait for streams to fully provide it, in order for tasks to be released defining the concept.

We can totally release tasks with the limited KillSwitch-based stream cancellation for now, while a plan is made how (and whether) to add a full stream completion callback to streams.

jypma avatar May 07 '25 10:05 jypma

We are already using the main branch for 1.2 dev and have released a 1.2.0-M1 release. So this could easily be part of the ultimate 1.2.0 release.

I'm not sure if we really need to delay this to a 2.0 release.

pjfanning avatar May 07 '25 11:05 pjfanning

Moving Task into its own task module has hit a bit of a snag, due to the intent to not have the user provide a Materializer when creating a Task from a RunnableGraph (the Materializer is to be provided by Runtime once the task is actually executed). However, Materializer (obviously...) is in stream, so there's a dependency challenge here. We can go with ActorSystem instead of Materializer, giving us a few options:

  • Plan A: Keep Task in stream after all, since they are a fair bit intertwined
  • Plan B: Have Task in task module, make Runtime provide ActorSystem, and invoke SystemMaterializer.apply every time a stream is materialized (that's in effect a ConcurrentHashMap lookup in a typically very small map)
  • Plan C: Have Runtime maintain its own map of "extensions", now going for a simple non-concurrent immutable map reference (racing multiple created materializers is fine on startup).

Would [B] have a measurable performance impact? I suppose if it end up it does, we can slip in [C] after the fact, too.

What do others think here?

jypma avatar May 22 '25 06:05 jypma

I'm happy to treat this as an API that is documented as experimental and allow for later changes.

In my mind, it might be better to not immediately add this to pekko core but to have it as 3rd party lib. 3rd party libs can be released on a whim and can iterate quickly. This repo has loads of jars and a stable code base. There is a reluctance to modify any APIs unless they are delayed to as yet unplanned major releases. Changes can get delayed by other incomplete changes that are being worked on. We could add any plumbing that this code needs in the pekko core. When this code gets more stable, we could then move it pekko core.

pjfanning avatar May 26 '25 20:05 pjfanning

@pjfanning Thanks for the clarification. However, I'm not fully understanding the concrete way forward you're suggesting. Should I close the PR, and retain this only as a separate library on a separate repository? I don't expect I can give that the needed visibility on my own.

It would also be a lot less comfortable to use, since Flow, Source and friends would lack first-class Task operators. This is particular challenge for Java code bases, which is precisely what I envisioned for this change: there really isn't a "nice" Java-friendly effect system out there. (Scala is less of an issue, implicits can make anything glue together).

For this to be nice to use, we need to be able to (in Java) write things like

Source.from(someStuff)
   .mapTask(elem -> constructTask(elem))
   .mapTask(res -> Task.connect(someOtherSource(elem), someOtherSink))

which is impossible in the current pekko API (since, besides type conversions, Materializer is needed to invoke nested graph creations).

I'd prefer to keep the work open as an actual fork/PR of the main repo, with frequent rebases. We can certainly delay an actual release (or even plan of that) until a later time.

jypma avatar May 27 '25 11:05 jypma

#1802 includes no changes to the existing Pekko Flow or Source APIs and could easily be delivered as a 3rd party standalone lib.

pjfanning avatar May 27 '25 12:05 pjfanning