akka-persistence-dynamodb
akka-persistence-dynamodb copied to clipboard
Deserialization of DynamoDB event's binary messages
Hello, I try to wrtie a lambda that reacts to DynamoDB trigger on the Journal
table created with akka-persistence-dynamodb. I tried deserialising the binary message from record.getDynamodb.getNewImage.get("message").getB
with Apache Commons SerializationUtils but it results with the following exception:
java.io.StreamCorruptedException: invalid stream header: 0AEE0108: org.apache.commons.lang3.SerializationException
This is my lambda function:
package org.acme.project.sessionquery
import com.amazonaws.services.lambda.runtime.events.DynamodbEvent
import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import org.acme.project.sessionquery.api._
import org.apache.commons.lang3.SerializationUtils
import scala.annotation.unused
import scala.jdk.CollectionConverters.CollectionHasAsScala
@unused
class Service extends RequestHandler[DynamodbEvent, Response] {
override def handleRequest(event: DynamodbEvent, context: Context): Response = {
val records = event.getRecords.asScala
records.foreach { record =>
record.getEventName match {
case "INSERT" =>
val messageBuffer = record.getDynamodb.getNewImage.get("message").getB
val deserializedObject = SerializationUtils.deserialize(messageBuffer.array())
println(s"Deserialized object: $deserializedObject")
}
}
Ok
}
}
I guess it has something to do with how the akka-persistence-dynamodb
serializes the event before storing it in the Journal table, although I have no idea where to look for the reference. Any ideas how to tackle this?