pekko icon indicating copy to clipboard operation
pekko copied to clipboard

Introduce Task monad

Open jypma opened this issue 1 year ago • 4 comments

Relates to #1801

This commit introduces Task, a data structure that represents a recipe, or program, for producing a value of type T (or failing with an exception). It is similar in semantics to RunnableGraph[T], but intended as first-class building block.

It has the following properties:

  • A task can have resources associated to it, which are guaranteed to be released if the task is cancelled or fails
  • Tasks can be forked so multiple ones can run concurrently
  • Such forked tasks can be cancelled

A Task can be created from a RunnableGraph which has a KillSwitch, by connecting a Source and a Sink through a KillSwitch, or by direct lambda functions.

Open discussion points and TODOs (in order of highest architectural impact):

  • [X] Current cancellation of a graph through KillSwitch doesn't communicate back a trigger when the graph is actually done cancelling. However, since we also have the Future[T] that the graph's sink materializes to, that's enough to synchronize on.
  • [X] Resource safety, which can then guarantee cleanup even under cancellation signals.
  • [X] Background processing wth cancellation (fibers, forkDaemon, forkResource). No child fibers (yet).
  • [X] Add many more combinators (plain zip, andThen, before, raceAll)
  • [x] Add more combinators (asResource, ...)
  • [x] Add .delay() and general scheduling by hooking in the scheduling features of Materializer
  • [x] Sink.forEachTask and Flow.mapTask
  • [x] Resource.ofAutoCloseable
  • [x] Task.never()
  • [ ] Task.runAsMain (or perhaps Application base classes / traits), which runs a task as main(), capturing Ctrl+C to interrupt the task (and, hence, clean up resources).
  • [ ] Move to task module. Don't depend on stream. Streams should depend on task, so we can use tasks directly in stream combinators.
  • [ ] Scala DSL

Needing an eye from reviewers:

  • [ ] More test cases to exercise typical edge cases and concurrency race conditions

To be handled in later pull requests:

  • TestClock (implementation of Clock for unit tests, where tests can advance the clock, so time-driven code can run deterministically and faster than realtime)
  • Fun Task-friendly concurrency primitives like a queue, mutex, and a mutable ref with stream subscription support.
  • (Potentially) Another monad Effect[E,A] which introduces modelled errors (in addition to exceptions). This can be easily built on top of a TaskDef[Either[E,A]].

Fixes #1801 .

jypma avatar Mar 18 '25 10:03 jypma

I'm getting closer to a first well-connected base with things like resource safety, interruptability and time somewhat properly present. I think we should have a discussion on what to do with the interruptability of graphs, though:

  • Pekko's KillSwitch only has a void method to stop a graph. You won't know when it's actually stopped, unless you also have a Sink with a future that completes when the graph is actually dead.
  • Probably as a side result of this, pekko-connectors-kafka has its own DrainingControl concept, which does promise to shut down the graph (even cleanly).

Is this something to discuss and look at now? I suppose it could be done later, by putting interruptability into a type class when creating a GraphDef, but it'd be nicer to stick to first-level concepts (like KillSwitch).

jypma avatar Apr 01 '25 07:04 jypma

@jypma if you thinking copying some of the pekko-connectors-kafka code into your task package helps, go ahead. Let's keep any additions specific to the task package for now and we can later discuss if it is worth taking some of reusable bits out and making them available as public APIs or to be used by other Pekko classes.

pjfanning avatar Apr 01 '25 07:04 pjfanning

I've decided to just introduce a RunningGraph interface to represent a graph that will eventually complete (with a Task), and has the ability to be interrupted (with another Task). That covers both KillSwitch (where interruption is immediate, but followed by awaiting the sink's completion stage) and DrainingControl (where interruption has a clear "done" signal).

jypma avatar Apr 06 '25 18:04 jypma

@jypma Seems there are some conflicts. I would like to learn more once this is ready

He-Pin avatar Sep 22 '25 03:09 He-Pin