akka-persistence-jdbc
akka-persistence-jdbc copied to clipboard
akka-persistence-jdbc and JSON serialisation
Hey guys!
I was wondering how should I properly use/extend akka-persistence-jdbc so that my events are stored in PostgreSQL as JSON?
- I've checked akka-persistence-jdbc-play -
JsonJournalSerializer.scala
example that extendsakka.persistence.jdbc.serialization.journal.JsonJournalConverter
- to my understanding this class is not part of akka-persistence-jdbc - meaning that It can't be used? - I've also checked akka-persistence-jdbc-serialization-json -
JsonJournalConverter.scala
that akka-persistence-jdbc-play depends oponce.
Is there a trait / class / interface that should be extended/used - or anything like that that would be a good starting point? I'm using json4s as my JSON de/serializer of choice.
Please help me and I'll return the favor by updating the README.md or something.
Thank you!
After some exploring, I came up with following solutions. Please do tell me if it's wrong one. ;)
Inside my configuration of Akka (application.conf
) I've added new serializer for akka.persistence.PersistentRepr
like so...
akka {
actor {
serializers {
jsonSerializer = "com.foo.serialization.JSONSerializer"
}
serialization-bindings {
"akka.persistence.PersistentRepr" = jsonSerializer
"com.foo.domain.Actor$SomeEvent" = jsonSerializer
"java.io.Serializable" = none
}
}
}
My JSONSerializer
looks like this:
package com.foo.serialization
import akka.serialization.Serializer
import com.foo.util.JsonSupport
import org.json4s._
import org.json4s.jackson.Serialization
class JSONSerializer extends Serializer with JsonSupport {
override implicit val formats: Formats = Serialization.formats(NoTypeHints)
override def identifier: Int = 9001009
override def includeManifest: Boolean = true
override def fromBinary(bytes: Array[Byte], manifest: Option[Class[_]]): AnyRef = {
implicit val mf = Manifest.classType(manifest.get)
Serialization.read(new String(bytes))
}
override def toBinary(o: AnyRef): Array[Byte] = {
Serialization.write(o).getBytes("UTF-8")
}
}
Now this does NOT mean that events will be inserted into database as text / JSON, they are still bytea
so in order to see events as JSON text you can do this in PostgreSQL.
SELECT convert_from(message, 'UTF-8')::json FROM journal;
-- Or to be bit more exact (unwrap from PersistentRepr)
SELECT convert_from(message, 'UTF-8')::json -> payload FROM journal;
You are making good steps, however I think you probably do not want to store you events as byte arrays at all. Out of the box, we have no support for this, but there are some types you can extend.
What you could do is create your own implementation of JournalDao if you also use the akka-persistence-queries then you need to implement ReadJournalDao too.
In addition if you want to store your snapshots as json too then you need to implement your own version of SnapshotDao.
The readme (see front page of this github repo) has a description on how to configure your custom dao implementations in the config
Note, if you have implementations of these json based dao's then feel free to open a PR to include those in the project. I would be happy to help you set up a configuration for some of our existing testcases to support the json-based dao implementations.
@WellingR / @otobrglez It would be create to have Json DAO. Even better if they are not bound to any specific Json lib (json4s, play-json, circe), but configurable by passing an encoder/decoder.
@otobrglez, because you mentioned Postgres I imagine you're planning to use slick-pg and Postgres support for jsonb. Did you have that in mind?
Oh, I just realised one thing. The current implementation doesn't have a dedicated column to store the payload + the manifest. It serializes a PersistentRepr
in which the payload for the event is serialized using whatever serializer your have configured for it.
So, whatever you end up using, it won't be saved as raw json, but as something wrapped by PersistentRepr
.
In Lagom for instance, we do have json serializers for events and snapshots, but when it comes to the format saved on DB, it's just a byte array inside PersistentRepr
.
Indeed, the current implementation serializes the complete PersistentRepr, which includes the manifest (and a bit more). The consequence of this is that SELECT convert_from(message, 'UTF-8')::json FROM journal;
is not going to work.
However it should be possible to bypass this mechanism completely by making a custom table definition and dao implementations for this as suggested in my previous comment.
I agree with Rentato that if we were to support this feature in this plugin, then we should not have a dependency on any json plugins, (we should make the json serialization mechanism configurable somehow).
Also if possible I would like to avoid a dependency on slick-pg (however it is okay in the tests) if slick can not help us for all cases then we can always fallback to the sql and sqlu interpolator syntax.
To be honest, I fear that we will add lots of complexity to support this: custom table + different DAOs. But I may be wrong. That's just my first impression.
You could be right, perhaps we can publish this in a separate project.
FYI, it seems that this is being implemented in https://github.com/d2aborg/akka-persistence-jdbc-jackson.
What's the story here today?
With akka-persistence-jdbc 5 I have an event_payload_column
which contains a byte array (pg bytea
) of the JSON of the event. I guess the fact that it is JSON is because I have told Akka to serialize as JSON and from the point of akka-persistence-jdbc it could be "anything" in there.
What steps, if any, could I take to have that column be of pg type json
instead of bytea
?
My use case is to be able to do custom event streams with more granular filtering directly in Postgres than what is available with Akka Persistence Query (which today would require me to pull all events and filter client side).
@PerWiklander, it's not enough to change the column to json
, when persisting, the plugin must keep the payload as json.
I think you can achieve what you want by defining your own DAO for Postgres using pg-slick extension and defining the column type as being jsonb
.