akka-persistence-dynamodb icon indicating copy to clipboard operation
akka-persistence-dynamodb copied to clipboard

Deserialization of DynamoDB event's binary messages

Open LukeKeywalker opened this issue 2 years ago • 0 comments

Hello, I try to wrtie a lambda that reacts to DynamoDB trigger on the Journal table created with akka-persistence-dynamodb. I tried deserialising the binary message from record.getDynamodb.getNewImage.get("message").getB with Apache Commons SerializationUtils but it results with the following exception:

java.io.StreamCorruptedException: invalid stream header: 0AEE0108: org.apache.commons.lang3.SerializationException

This is my lambda function:

package org.acme.project.sessionquery

import com.amazonaws.services.lambda.runtime.events.DynamodbEvent
import com.amazonaws.services.lambda.runtime.{Context, RequestHandler}
import org.acme.project.sessionquery.api._
import org.apache.commons.lang3.SerializationUtils

import scala.annotation.unused
import scala.jdk.CollectionConverters.CollectionHasAsScala

@unused
class Service extends RequestHandler[DynamodbEvent, Response] {
  override def handleRequest(event: DynamodbEvent, context: Context): Response = {
    val records = event.getRecords.asScala
    records.foreach { record =>
      record.getEventName match {
        case "INSERT" =>
          val messageBuffer = record.getDynamodb.getNewImage.get("message").getB
          val deserializedObject = SerializationUtils.deserialize(messageBuffer.array())
          println(s"Deserialized object: $deserializedObject")
      }
    }
    Ok
  }
}

I guess it has something to do with how the akka-persistence-dynamodb serializes the event before storing it in the Journal table, although I have no idea where to look for the reference. Any ideas how to tackle this?

LukeKeywalker avatar Dec 01 '22 19:12 LukeKeywalker