bref icon indicating copy to clipboard operation
bref copied to clipboard

Kinesis non json data value is getting json_decoded, and throwing error

Open kapyaar opened this issue 9 months ago • 2 comments

I have the basic code

<?php

// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0

use Bref\Context\Context;
use Bref\Event\Kinesis\KinesisEvent;
use Bref\Event\Kinesis\KinesisHandler;
use Bref\Logger\StderrLogger;

require __DIR__ . '/vendor/autoload.php';

class Handler extends KinesisHandler
{
    private StderrLogger $logger;

    public function __construct(StderrLogger $logger)
    {
        $this->logger = $logger;
    }

    /**
     * This function will process each record in the Kinesis event.
     *
     * @param KinesisEvent $event
     * @param Context $context
     */
    public function handleKinesis(KinesisEvent $event, Context $context): void
    {
        $this->logger->info('Processing Kinesis Event...');
        
        // Loop through each record in the Kinesis event
        foreach ($event->getRecords() as $record) {
            $data = base64_decode($record->getData());
        }
    }
}

// Create a logger instance
$logger = new StderrLogger();

// Return the handler instance
return new Handler($logger);

Once deployed, I am testing with the following data (sample provided by aws)

{
  "Records": [
    {
      "kinesis": {
        "partitionKey": "partitionKey-03",
        "kinesisSchemaVersion": "1.0",
        "data": "SGVsbG8sIHRoaXMgaXMgYSB0ZXN0IDEyMy4=",
        "sequenceNumber": "49545115243490985018280067714973144582180062593244200961",
        "approximateArrivalTimestamp": 1428537600
      },
      "eventSource": "aws:kinesis",
      "eventID": "shardId-000000000000:49545115243490985018280067714973144582180062593244200961",
      "invokeIdentityArn": "arn:aws:iam::EXAMPLE",
      "eventVersion": "1.0",
      "eventName": "aws:kinesis:record",
      "eventSourceARN": "arn:aws:kinesis:EXAMPLE",
      "awsRegion": "us-east-1"
    }
  ]
}

And I get the following result

Status: Failed
Test Event Name: (unsaved) test event

Response:
{
  "errorType": "JsonException",
  "errorMessage": "Syntax error",
  "stackTrace": [
    "#0 /var/task/vendor/bref/bref/src/Event/Kinesis/KinesisRecord.php(31): json_decode('Hello, this is ...', true, 512, 4194304)",
    "#1 /var/task/lambdaConsumerForKinesis.php(207): Bref\\Event\\Kinesis\\KinesisRecord->getData()",
    "#2 /var/task/vendor/bref/bref/src/Event/Kinesis/KinesisHandler.php(15): Handler->handleKinesis(Object(Bref\\Event\\Kinesis\\KinesisEvent), Object(Bref\\Context\\Context))",
    "#3 /var/task/vendor/bref/bref/src/Runtime/Invoker.php(24): Bref\\Event\\Kinesis\\KinesisHandler->handle(Array, Object(Bref\\Context\\Context))",
    "#4 /var/task/vendor/bref/bref/src/Runtime/LambdaRuntime.php(94): Bref\\Runtime\\Invoker->invoke(Object(Handler), Array, Object(Bref\\Context\\Context))",
    "#5 /var/task/vendor/bref/bref/src/FunctionRuntime/Main.php(40): Bref\\Runtime\\LambdaRuntime->processNextEvent(Object(Handler))",
    "#6 /opt/bref/bootstrap.php(21): Bref\\FunctionRuntime\\Main::run()",
    "#7 {main}"
  ]
}

Function Logs:
// Copyright Amazon.com, Inc. or its affiliates. All Rights Reserved.
// SPDX-License-Identifier: Apache-2.0
START RequestId: 27866f85-c48d-42f3-bc1d-d73ff5ea33be Version: $LATEST
27866f85-c48d-42f3-bc1d-d73ff5ea33be	Invoke Error	{"errorType":"JsonException","errorMessage":"Syntax error","stack":["#0 \/var\/task\/vendor\/bref\/bref\/src\/Event\/Kinesis\/KinesisRecord.php(31): json_decode('Hello, this is ...', true, 512, 4194304)","#1 \/var\/task\/lambdaConsumerForKinesis.php(207): Bref\\Event\\Kinesis\\KinesisRecord->getData()","#2 \/var\/task\/vendor\/bref\/bref\/src\/Event\/Kinesis\/KinesisHandler.php(15): Handler->handleKinesis(Object(Bref\\Event\\Kinesis\\KinesisEvent), Object(Bref\\Context\\Context))","#3 \/var\/task\/vendor\/bref\/bref\/src\/Runtime\/Invoker.php(24): Bref\\Event\\Kinesis\\KinesisHandler->handle(Array, Object(Bref\\Context\\Context))","#4 \/var\/task\/vendor\/bref\/bref\/src\/Runtime\/LambdaRuntime.php(94): Bref\\Runtime\\Invoker->invoke(Object(Handler), Array, Object(Bref\\Context\\Context))","#5 \/var\/task\/vendor\/bref\/bref\/src\/FunctionRuntime\/Main.php(40): Bref\\Runtime\\LambdaRuntime->processNextEvent(Object(Handler))","#6 \/opt\/bref\/bootstrap.php(21): Bref\\FunctionRuntime\\Main::run()","#7 {main}"]}

If I change the "data" to base64encoded valid json (For eg: 'eyJhIjoxfQ==' for {"a":1}), then everthing is fine.

Wonder how this should be handled. I am using the layer

php-84 | 18 | provided.al2 | - | arn:aws:lambda:us-east-1:534081306603:layer:php-84:18

kapyaar avatar Mar 02 '25 22:03 kapyaar

Hi, if the data does not always contain JSON then that's a design issue indeed 😓

I have never used Kinesis myself, the feature was contributed in #640. I think this needs to be fixed, but we need to avoid breaking changes.

In the meantime you can also use this:

$data = base64_decode($record->getRawData());

mnapoli avatar Mar 03 '25 10:03 mnapoli

@mnapoli Thanks a lot. I tested getRawData(), and that works just fine. And yes, kinesis supports non json data/ blob, so a default json_decode on getData() did nt make sense. AWS docs examples by itself recommends using getData(), and testing with non json data.

$records = $event->getRecords();
foreach ($records as $record) {
   $data = $record->getData();
   $this->logger->info(json_encode($data));
   // TODO: Do interesting work based on the new data
   // Any exception thrown will be logged and the invocation will be marked as failed
}

Here, they do json_encode at will. I searched issues here, and the only thing I could find was #640. Did nt know getRawData() was available. Thanks again!!!!

kapyaar avatar Mar 03 '25 16:03 kapyaar