EventStore.Akka.Persistence
EventStore.Akka.Persistence copied to clipboard
Provide a sample application
I'm trying to integrate this project into my project but I'm a little stuck...
Is there a simple way to make my (existing) events (case classes) be serialized ?
I added the SprayJsonSerializer to my project, added the required configuration files.. now I'm stuck on how to change my existing events so they get handled correct by this plugin.
My approach was that my Events would simply extend eventstore.EventData, but that does not work as its forbidden to extend case classes...
package com.dominikdorn.akkapersistence.messages
import java.util.UUID
import eventstore.{ContentType, Event, Content, EventData}
case class Cmd(msg: String)
case class Evt(_type: String, msg: String) extends EventData (
eventType = _type,
UUID.randomUUID(),
Content(akka.util.ByteString.fromString(msg), ContentType.Json),
metadata = Content()
)
Is there another way? I don't want to implement a SprayJsonSerializer for every event type I have in my system... maybe providing a implicit MyEvent -> JsValue converter?
A sample which illustrates how to really use this akka persistence plugin would greatly help.
Thanks, Dominik
Hi @domdorn, there is an example basically you need to provide mapping to each event.
@t3hnar as noted in the first post, I've seen the sample. However, its unreasonable to copy this whole code for every event-type received/stored by an akka-persistence actor. There needs to be an easier way for this than creating dozens of classes with basically identical contents.
Is there a real sample application somewhere available? Or is this repository/project more a proof of concept ?
This repository project is not a proof of concept, it is fully functional plugin.
Those examples are provided to show how to store events as json in eventstore and not in binary which is used by akka by default. This project is not intended to reinvent serialization/deserialization approaches used by developers.
Regarding your code snippet, I don't understand why did you extends EventData
(there is nothing similar in examples) as well as don't understand why do you need to copy
code for each event. All you need is to have something that can
Json => T
and T => Json
and pass Json
to EventData
as well as read it from there.
I'd recommend to start from looking on https://github.com/EventStore/EventStore.Akka.Persistence/blob/master/src/test/scala/akka/persistence/eventstore/SprayJsonSerializer.scala#L108
I hardcoded to have it as String in example, here is where your T => Json
should go.
For reads: https://github.com/EventStore/EventStore.Akka.Persistence/blob/master/src/test/scala/akka/persistence/eventstore/SprayJsonSerializer.scala#L103
Also you should remember that akka wraps each your event in to PersistentRepr
which then converted to json and passed to EventData
I have followed the guidance to persist the events as json and have a few questions:
My event is a VehicleInitialized event. I see the PersistentRepr json representation in the eventstore browser when I click on the json link for this particular event. It looks like this:
Data
{
"payload": "{\"regNumber\":\"123\",\"color\":\"red\"}",
"sequenceNr": 1,
"persistenceId": "vehicle-f06c85e9-e5da-443a-ba84-cf1f2a6c19ee",
"deleted": false,
"sender": "akka://seed-actor-system/user/seed-service/$i#-1797565315"
}
and the type of the event is akka.persistence.PersistentRepr.
I was hoping for the event type to be namespace.VehicleInitialized and the json to be:
{
"regNumber": "123",
"color": "red"
}
Using akka-persistence without this plugin does list my different event types as the event type and not PersistentRepr, but naturally, stored in binary.
Does akka-persistence require me to persist PersistenceRepr? Surely having just my event data persisted will make querying and projections a lot easier?
@petervdm
Does akka-persistence require me to persist PersistenceRepr?
Yes it does.
Surely having just my event data persisted will make querying and projections a lot easier?
You can achieve this with writing PersistentRepr
part to EventData.metadata
and then your own payload
to EventData.data
Ok, that's a great idea. That is probably the best place for that akka-persistence information, or any other producer information for that matter. Keep the EventData.data clean and describe it in metadata.
Hi, I've recently started looking into using this project with my application. However , I'm unable to locate any example solutions. All the above mentioned links are resulting in 404. Is this repo not being looked after any more ? Thanks in advabce
@NayabSiddiqui checkout Akka-DDD.
It contains eventstore-akka-persistence module: https://github.com/pawelkaczor/akka-ddd/tree/master/eventstore-akka-persistence/src/main/scala/pl/newicom/eventstore
@NayabSiddiqui thanks for noticing, I've fixed these links
Thanks @pawelkaczor , @t3hnar , I had a look at the examples and wrote my custom serializer as below :
class EventStoreJsonSerializer(val system: ExtendedActorSystem) extends EventStoreSerializer {
import EventStoreJsonSerializer._
implicit val formats = DefaultFormats + SnapshotSerializer + new PersistentReprSerializer(system) + ActorRefSerializer ++ JodaTimeSerializers.all
def identifier = Identifier
def includeManifest = true
def fromBinary(bytes: Array[Byte], manifestOpt: Option[Class[_]]) = {
implicit val manifest = manifestOpt match {
case Some(x) => Manifest.classType(x)
case None => Manifest.AnyRef
}
read(new String(bytes, UTF8))
}
def toBinary(o: AnyRef) = write(o).getBytes(UTF8)
def toEvent(x: AnyRef) = x match {
case x: PersistentRepr => EventData(
eventType = classFor(x).getName,
data = Content(ByteString(toBinary(x)), ContentType.Json)
)
case x: SnapshotEvent => EventData(
eventType = classFor(x).getName,
data = Content(ByteString(toBinary(x)), ContentType.Json)
)
case _ => sys.error(s"Cannot serialize $x, SnapshotEvent expected")
}
def fromEvent(event: Event, manifest: Class[_]) = {
val clazz = Class.forName(event.data.eventType)
val result = fromBinary(event.data.data.value.toArray, clazz)
if (manifest.isInstance(result)) result
else sys.error(s"Cannot deserialize event as $manifest, event: $event")
}
def classFor(x: AnyRef) = x match {
case x: PersistentRepr => classOf[PersistentRepr]
case _ => x.getClass
}
object ActorRefSerializer extends Serializer[ActorRef] {
val Clazz = classOf[ActorRef]
def deserialize(implicit format: Formats) = {
case (TypeInfo(Clazz, _), JString(x)) => system.provider.resolveActorRef(x)
}
def serialize(implicit format: Formats) = {
case x: ActorRef => JString(x.path.toSerializationFormat)
}
}
}
object EventStoreJsonSerializer {
val UTF8: Charset = Charset.forName("UTF-8")
val Identifier: Int = ByteBuffer.wrap("json4s".getBytes(UTF8)).getInt
object SnapshotSerializer extends Serializer[Snapshot] {
val Clazz = classOf[Snapshot]
def deserialize(implicit format: Formats) = {
case (TypeInfo(Clazz, _), JObject(List(
JField("data", JString(x)),
JField("metadata", metadata)))) => Snapshot(x, metadata.extract[SnapshotMetadata])
}
def serialize(implicit format: Formats) = {
case Snapshot(data, metadata) => JObject("data" -> JString(data.toString), "metadata" -> decompose(metadata))
}
}
class PersistentReprSerializer(system: ExtendedActorSystem) extends Serializer[PersistentRepr] {
val Clazz = classOf[PersistentRepr]
def deserialize(implicit format: Formats) = {
case (TypeInfo(Clazz, _), json) =>
val x = json.extract[Mapping]
val payload = x.manifest match {
case "event.EmployeeRegistered" => read[EmployeeRegistered](x.payload)
case "event.LeavesCredited" => read[LeavesCredited](x.payload)
case "event.FullDayLeavesApplied" => read[FullDayLeavesApplied](x.payload)
case "event.HalfDayLeavesApplied" => read[HalfDayLeavesApplied](x.payload)
}
PersistentRepr(
payload = payload,
sequenceNr = x.sequenceNr,
persistenceId = x.persistenceId,
manifest = x.manifest,
writerUuid = x.writerUuid
)
}
def serialize(implicit format: Formats) = {
case x: PersistentRepr =>
val payload = x.payload match {
case e:EmployeeEvent => write(e)
}
val mapping = Mapping(
payload = payload,
sequenceNr = x.sequenceNr,
persistenceId = x.persistenceId,
manifest = x.payload.getClass.getName,
writerUuid = x.writerUuid
)
decompose(mapping)
}
}
case class Mapping(
payload: String,
sequenceNr: Long,
persistenceId: String,
manifest: String,
writerUuid: String
)
}
My events used are as follows :
sealed trait EmployeeEvent extends Event
case class EmployeeRegistered(firstName: String, lastName: String) extends EmployeeEvent
case class LeavesCredited(creditedLeaves: Float) extends EmployeeEvent
case class FullDayLeavesApplied(from: DateTime, to: DateTime) extends EmployeeEvent
case class HalfDayLeavesApplied(from: DateTime, to: DateTime) extends EmployeeEvent
object EmployeeRegistered {
implicit val employeeRegisteredFormat = Json.format[EmployeeRegistered]
}
object LeavesCredited {
implicit val leavesCreditedFormat = Json.format[LeavesCredited]
}
object FullDayLeavesApplied {
implicit val fullDayLeavesApplied = Json.format[FullDayLeavesApplied]
}
object HalfDayLeavesApplied {
implicit val halfDayLeavesAppliedFormat = Json.format[HalfDayLeavesApplied]
}
When I run my tests, I see that for some streams, the event-type in EventStore is namespace.EventName
and sometimes it is akka.persistence.PersistentRepr
. I'm unable to get my head around this. I want to store it as namespace.EventName
only.