akka-effect
akka-effect copied to clipboard
Cats-Effect & Akka interop
Akka-Effect
This project aims to build a bridge between akka and pure functional code based on cats-effect
Covered:
Building blocks
akka-effect-actor
module
Tell.scala
Represents ActorRef.tell
trait Tell[F[_], -A] {
def apply(a: A, sender: Option[ActorRef] = None): F[Unit]
}
Ask.scala
Represents ActorRef.ask
pattern
trait Ask[F[_], -A, B] {
def apply(msg: A, timeout: FiniteDuration, sender: Option[ActorRef]): F[B]
}
Reply.scala
Represents reply pattern: sender() ! reply
trait Reply[F[_], -A] {
def apply(msg: A): F[Unit]
}
Receive.scala
This is what you need to implement instead of familiar new Actor { ... }
trait Receive[F[_], -A, B] {
def apply(msg: A): F[B]
def timeout: F[B]
}
ActorOf.scala
Constructs Actor.scala
out of receive: ActorCtx[F] => Resource[F, Receive[F, Any]]
ActorCtx.scala
Wraps ActorContext
trait ActorCtx[F[_]] {
def self: ActorRef
def parent: ActorRef
def executor: ExecutionContextExecutor
def setReceiveTimeout(timeout: Duration): F[Unit]
def child(name: String): F[Option[ActorRef]]
def children: F[List[ActorRef]]
def actorRefFactory: ActorRefFactory
def watch[A](actorRef: ActorRef, msg: A): F[Unit]
def unwatch(actorRef: ActorRef): F[Unit]
def stop: F[Unit]
}
akka-effect-persistence
module
PersistentActorOf.scala
Constructs PersistentActor.scala
out of eventSourcedOf: ActorCtx[F] => F[EventSourced[F, S, E, C]]
EventSourced.scala
Describes a lifecycle of entity with regard to event sourcing, phases are: Started, Recovering, Receiving and Termination
trait EventSourced[F[_], S, E, C] {
def eventSourcedId: EventSourcedId
def recovery: Recovery
def pluginIds: PluginIds
def start: Resource[F, RecoveryStarted[F, S, E, C]]
}
RecoveryStarted.scala
Describes start of recovery phase
trait RecoveryStarted[F[_], S, E, C] {
def apply(
seqNr: SeqNr,
snapshotOffer: Option[SnapshotOffer[S]]
): Resource[F, Recovering[F, S, E, C]]
}
Recovering.scala
Describes recovery phase
trait Recovering[F[_], S, E, C] {
def replay: Resource[F, Replay[F, E]]
def completed(
seqNr: SeqNr,
journaller: Journaller[F, E],
snapshotter: Snapshotter[F, S]
): Resource[F, Receive[F, C]]
}
Replay.scala
Used during recovery to replay events
trait Replay[F[_], A] {
def apply(seqNr: SeqNr, event: A): F[Unit]
}
Journaller.scala
Describes communication with underlying journal
trait Journaller[F[_], -A] {
def append: Append[F, A]
def deleteTo: DeleteEventsTo[F]
}
Snapshotter.scala
Describes communication with underlying snapshot storage
/**
* Describes communication with underlying snapshot storage
*
* @tparam A - snapshot
*/
trait Snapshotter[F[_], -A] {
def save(seqNr: SeqNr, snapshot: A): F[F[Instant]]
def delete(seqNr: SeqNr): F[F[Unit]]
def delete(criteria: SnapshotSelectionCriteria): F[F[Unit]]
}
akka-effect-eventsourced
module
Engine.scala
This is the main runtime/queue where all actions against your state are processed in desired eventsourcing sequence:
- validate and finalize events
- append events to journal
- publish changed state
- execute side effects
It is optimised for maximum throughput hence different steps of different actions might be executed in parallel as well as events might be stored in batches
trait Engine[F[_], S, E] {
def state: F[State[S]]
/**
* @return Outer F[_] is about `load` being enqueued, this immediately provides order guarantees
* Inner F[_] is about `load` being completed
*/
def apply[A](load: F[Validate[F, S, E, A]]): F[F[A]]
}
Setup
in build.sbt
addSbtPlugin("com.evolution" % "sbt-artifactory-plugin" % "0.0.2")
libraryDependencies += "com.evolutiongaming" %% "akka-effect-actor" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-actor-tests" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-persistence" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-eventsourcing" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-cluster" % "0.2.1"
libraryDependencies += "com.evolutiongaming" %% "akka-effect-cluster-sharding" % "0.2.1"