powertools-lambda-typescript icon indicating copy to clipboard operation
powertools-lambda-typescript copied to clipboard

Feature request: parse DynamoDB Stream events via Kinesis Data Stream

Open dreamorosi opened this issue 1 year ago • 0 comments

Use case

When working with Amazon DynamoDB Streams and Amazon Kinesis Data Streams, with events being consumed by a AWS Lambda function customers want to parse and validate events before processing them.

flowchart LR
    A[DynamoDB Table] -->|DynamoDB Stream| B[Kinesis Data Stream]
    B -->|triggers| C[Lambda. Function]

In these cases, the change event from the DynamoDB Stream gets encoded and wrapped into a Kinesis Data Stream event that acts as an envelope. The current schemas we have available for both DynamoDB Stream and Kinesis Data Stream are not enough to support this use case.

The current DynamoDBStreamSchema assumes that events come in a shape that looks like this:

{
  "Records": [
     {
        "eventID": "1",
        "eventVersion": "1.0",
        "dynamodb": {
          "ApproximateCreationDateTime": 1693997155.0,
          "Keys": {
            "Id": {
              "N": "101"
            }
          },
          "NewImage": {
            "Message": {
              "S": "New item!"
            },
            "Id": {
              "N": "101"
            }
          },
          "StreamViewType": "NEW_AND_OLD_IMAGES",
          "SequenceNumber": "111",
          "SizeBytes": 26
        },
        "awsRegion": "us-west-2",
        "eventName": "INSERT",
        "eventSourceARN": "eventsource_arn",
        "eventSource": "aws:dynamodb"
      }
  ]
}

On the other hand, when these same events come through a Kinesis Data Stream, they look like this (note that the data field actually comes as base64 encoded, in the example below I am presenting it as decoded for easier understanding ):

{
  Records: [
    {
      eventSource: 'aws:kinesis',
      eventVersion: '1.0',
      eventID:
        'shardId-000000000000:49656632116218945776331460018176327016585772817654480898',
      eventName: 'aws:kinesis:record',
      invokeIdentityArn:
        'arn:aws:iam::123456789012:role/KinesisddbStack-MyFunctionServiceRole3C357FF2-bxvXci1V8a2G',
      eventSourceARN:
        'arn:aws:kinesis:eu-west-1:123456789012:stream/KinesisddbStack-MyDataStream2006A1E4-BJi822bWFiFV',
      kinesis: {
        kinesisSchemaVersion: '1.0',
        partitionKey: '508B17441EAB608C8643A4479FCEF4A5',
        sequenceNumber:
          '49656632116218945776331460018176327016585772817654480898',
        approximateArrivalTimestamp: 1728572253.015,
        data: {
          awsRegion: 'eu-west-1',
          eventID: 'ec61129b-46af-4e89-b5d7-500aa6b9eeda',
          eventName: 'INSERT',
          userIdentity: null,
          recordFormat: 'application/json',
          tableName: 'MyTable',
          dynamodb: {
            ApproximateCreationDateTime: 1728572252034,
            Keys: {
              id: {
                S: 'foo',
              },
            },
            NewImage: {
              id: {
                S: 'foo',
              },
            },
            SizeBytes: 24,
          },
          eventSource: 'aws:dynamodb',
        },
        // data: 'eyJ...ZGIifQ==',
      },
    },
  ],
}

This means it's not possible to use DynamoDBStreamSchema to parse the data attribute due to the two structures being incompatible.

We should work to add support for a new schema specific to this type of integration.

Solution/User Experience

The schema below was successful in earlier tests

import { parser } from '@aws-lambda-powertools/parser/middleware';
import { KinesisEnvelope } from '@aws-lambda-powertools/parser/envelopes';
import middy from '@middy/core';
import { z } from 'zod';

const DynamoDBStreamEvent = z.object({
  awsRegion: z.string(),
  eventID: z.string(),
  eventName: z.enum(['INSERT', 'MODIFY', 'REMOVE']),
  userIdentity: z.null(),
  recordFormat: z.string(),
  tableName: z.string(),
  dynamodb: z.object({
    ApproximateCreationDateTime: z.number(),
    Keys: z.any().optional(),
    NewImage: z.any().optional(),
    SizeBytes: z.number(),
  }),
});

export const handler = middy(async (event: unknown) => {
  return {
    statusCode: 200,
    body: JSON.stringify('Hello, World!'),
  };
}).use(
  parser({
    schema: DynamoDBStreamEvent,
    envelope: KinesisEnvelope,
  }) 
);

Alternative solutions

No response

Acknowledgment

Future readers

Please react with 👍 and your use case to help us understand customer demand.

dreamorosi avatar Oct 11 '24 13:10 dreamorosi