akka-stream-eventsourcing
akka-stream-eventsourcing copied to clipboard
Low-level snapshot API
- Stream stage that uses event handler to create and emit state snapshots and metadata at custom intervals
- Initialize
EventSourcing
stage with state snapshot - Replay events from user-defined position (taken from snapshot metadata)
In case you would find it useful, here is the gist of the api I'm using to control snapshot creation :
/** Control how the a snapshot is stored, relatively to existing snapshots:
* - Epoch: The snapshot is deemed to the earliest snapshot,
* any existing snapshots with a smaller sequence number are discarded.
* - Cache: persist the snapshot, keeping all other existing snapshots.
*/
sealed trait SnapshotStoreMode
object SnapshotStoreMode {
case object Epoch extends SnapshotStoreMode
case object Cache extends SnapshotStoreMode
}
Then the EventSource graph stage would be parametrized with a SnapshotStrategy
:
/** Control when and how snapshots are persisted. Ie, given:
* - the number of events stored since the last snapshot.
* - the duration since the last snapshot.
* - the event being persisted, up to which the snapshot would be created.
* decide if a snapshot should be created and how (Some[SnapshotStoreMode]) or not (None).
*
* Implementation details: In the case where multiple events are persisted
* after a single request, this strategy is applied for each events, but if multiple
* [SnapshotStoreMode.Epoch] are yielded, only the last one is actually acted-on.
* Also any [SnapshotStoreMode.Cache] directives appearing before a
* [SnapshotStoreMode.Epoch] is ignored.
*/
type SnapshotStrategy[-E] = (Long, Duration, E) => Option[SnapshotStoreMode]
Thanks for sharing your ideas. The main purpose of the low-level snapshot API is to provide a foundation for implementing what you proposed (among other possible solution). I'd like to avoid to parameterize the EventSourcing
stage with snapshotting strategies/logic/abstractions directly, it should only be paramterized with initial state. Higher level snapshotting logic should be implemented in a layer above.
The ability to explicitly create (tagged) snapshots would enable support for versioned state.
@2beaucoup the low level API will provide sources that associate (i.e. tag) emitted snapshots with offsets (= version numbers). Do you see any issues?
Nope. :) Are these snapshots created just at static intervals or will it be possible to trigger them by a prop on e.g. Emitted
?
Here's the idea. Given a Snapshot
type, an eventSource
starting fromSequenceNr
and an eventHandler
:
import com.github.krasserm.ases.Durable
import com.github.krasserm.ases.EventSourcing.EventHandler
trait Snapshot[S] {
def state: S
def sequenceNr: Long // state "version"
}
def eventSource[E](fromSequenceNr: Long): Source[Durable[E], _]
def eventHandler[E, S]: EventHandler[E, S]
a snapshot source can be created with:
def snapshotSource[E, S](snapshot: Snapshot[S]): Source[Snapshot[S], _] =
eventSource[E](snapshot.sequenceNr + 1L)
.scan(snapshot)((s, d) => Snapshot(eventHandler(s.state, d.event), d.sequenceNr))
It emits a new snapshot with every new event from eventSource
. You can then apply any of the FlowOps
methods to implement time or sequence number based emission intervals. The high-level snapshot API (coming as separate ticket) will then provide Sink
s for writing these snapshots, for example, to Akka Persistence compliant snapshots stores or somewhere else. All this happens on the query side.
On the command side, an EventSourcing
stage will be initialized with the state
of a given snapshot and joined with an event log that emits events starting from the snapshot's sequenceNr + 1L
.
Looks pretty flexible. Thanks for the heads-up @krasserm!
@krasserm why not make Snapshot
just a case class
?
@t3hnar it will be. The above only explains the concept i.e. no need to cover implementation details.