powertools-lambda-typescript icon indicating copy to clipboard operation
powertools-lambda-typescript copied to clipboard

RFC (BatchProcessor): Addition of BatchProcessing Utilities for EventSource Mapping triggers (akin to python utility)

Open walmsles opened this issue 1 year ago • 1 comments

BatchProcessing Utility

Adding a BatchProcessing utility with the feature set from Python is needed to simplify coding and developer experience around processing data from EventSource mappings.

  • RFC PR: (leave this empty)
  • Related issue(s), if known:
  • Area: BatchProcessor (new)
  • Meet tenets: Yes
  • Approved by: ''
  • Reviewed by: ''

Summary

Motivation

This utility IMO allows teams to move faster and not make silly mistakes in batch processing - it provides a safety net for new serverless developers who make similar mistakes over and over, which I can prevent by making them use Python 😬. The automated handling of Batch Errors in Python is Out of the Box and dead simple to use, so Devs can get it right 100% of the time without thinking too deeply about it.

Combined with Idempotency across functions (when available) is a killer feature and is what provides the "Power" in Powertools for me.

Proposal

I am not a Node Typescript expert, so leaving a more detailed proposal to the wider Typescript contributors, maintainers and community.

User Experience

Drawbacks

Rationale and alternatives

Unresolved questions

walmsles avatar Aug 31 '22 02:08 walmsles

Hi @walmsles, thank you for opening this RFC.

Batch processing is definitely one of the use cases that we want to look into and consider for the future, however at the moment we cannot commit to any timeline as we are already working on Idempotency, Parameters, and other topics.

However I think this is a valuable RFC and I would be keen to hear what the community & other maintainers have to say in terms of design, DX, etc.

dreamorosi avatar Aug 31 '22 09:08 dreamorosi

+1

Jemesson avatar Jun 14 '23 19:06 Jemesson

I will be picking up the implementation for this -- design details are in the works!

erikayao93 avatar Jun 26 '23 15:06 erikayao93

Hi everyone, here is a design proposal. Would appreciate any feedback, especially on alternative solutions listed for some of the extra features.

Design Proposal (Request for comments)

1 Summary

The goal of this document is to propose the scope and design of the batch processing utility for Powertools for AWS (TypeScript). The utility has been implemented in the Python and Java repositories. We will use the current Python implementation (https://docs.powertools.aws.dev/lambda/python/latest/utilities/batch/) as a baseline, and describe the design decisions we will make in TypeScript.

2 Motivation

Batch processing is a utility that is currently used frequently by customers in Python. Typically, when sending a batch of messages to a Lambda function to process, an error on one message will cause the entire batch to return to the queue and be processed again. The batch processing utility allows customers using SQS, Kinesis, or DynamoDB Stream connected to a Lambda function to ensure only errored messages are re-run through the Lambda function.

To maintain parity with the Python version and provide this useful functionality to customers using the TypeScript repository, we provide the following design proposal.

3 Utility Interface

There are two primary usages of the batch processing utility we will aim to support:

  1. a batch processing function that can be called by the handler (aka Higher-order function / function wrapper)
  2. at the handler level, via Middy middleware

Both usages will be able to support batch processing from SQS, Kinesis, and DynamoDB.

3.1 Batch Processing Function

This is an example of how the batch processing function would be invoked for processing records from SQS:

import {
    BatchProcessor,
    EventType,
    processPartialResponse
} from `@aws-lambda-powertools/batch`;
import { SQSRecord } from "aws-lambda";

const processor = new BatchProcessor({ eventType: EventType.SQS });

const recordHandler = async (record: SQSRecord): Promise<void> {
    // example of record handling logic, this is provided by customers
    const item = JSON.parse(record.body);
    console.log(item)
}

const lambdaHandler = async ( _event: any, _context: any): Promise<void> => {
    return processPartialResponse(_event, recordHandler, processor);
}

The processPartialResponse() method will be implemented so that it can process records both synchronously and asynchronously.

3.2 Batch Processing Handler

import { 
  BatchProcessor,
  EventType,
  makeBatchHandler
} from '@aws-lambda-powertools/batch';
import type { SQSRecord, Context } from "aws-lambda";
import middy from '@middy/core';

const processor = new BatchProcessor({ eventType: EventType.SQS })

const lambdaHandler = async (event: SQSRecord, _context: Context): Promise<void> => {
    /* ...processor logic here... */
    const item = JSON.parse(record.body);
    console.log(item)
}

export const handler = middy(lambdaHandler)
  .use(makeBatchHandler({
    processor: processor
  });

4 Additional Features

These features may not be released as part of the first release, but should be implemented soon thereafter.

4.1 Bring your own processor

The BatchProcessor class will be implemented such that users will be able to implement their own batch processing class to override success and failure handling logic if necessary.

class myProcessor extends BatchProcessor {
    public successHandler(self, record, result): SuccessResponse {
        // custom success handling logic
    }
    public failureHandler(self, record, exception): FailureResponse {
        // custom failure handling logic
    }
}

4.2 Access processed messages

This will allow users to access a list of all returned values from the recordHandler function.

import { 
  BatchProcessor,
  EventType,
} from '@aws-lambda-powertools/batch';
import type { SQSEvent, Context, SQSBatchResponse } from 'aws-lambda';

const processor = new BatchProcessor({ eventType: EventType.SQS });

const recordHandler = async (record: SQSRecord): Promise<void> {
    // example of record handling logic, this is provided by customers
    const item = JSON.parse(record.body);
    console.log(item)
}

export const handler = async (event: SQSEvent, context: Context): Promise<SQSBatchResponse> => {
    const batch = event.Records;

    processor.register({ records: batch, handler: recordHandler });
    const processedMessaeges = processor.process();

    for (const message in processedMessages) {
        const [status, record] = message;
        
        console.log(status, record);
    }
    
    return processorClass.response();
}

This implementation would require the definition of an additional register method, which is not contained in the Python batch processing implementation. However, the additional method will allow the process() method to maintain identical syntax to its Python counterpart. Use of the new register method would be similar to the register method used in other functionality, like idempotency.

4.3 FIFO queues for SQS

In addition to the base BatchProcessor provided, there will also be an SqsFifoPartialProcessor provided out of the box that will preserve the order of the messages in the queue. This processor will stop processing messages after the first failure, and return all failed and unprocessed messages in batchItemFailures.

import {
    SqsFifoPartialProcessor,
    EventType,
    processPartialResponse
} from `@aws-lambda-powertools/batch`;
import { SQSRecord } from "aws-lambda";

const processor = new SqsFifoPartialProcessor();

const recordHandler = async (record: SQSRecord): Promise<void> {
    // example of record handling logic, this is provided by customers
    const item = JSON.parse(record.body);
    console.log(item)
}

const lambdaHandler = async ( _event: any, _context: any): Promise<void> => {
    return processPartialResponse(_event, recordHandler, processor);
}

4.4 Accessing Lambda context

In the use case where customers may need to access Lambda context while handling records, the recordHandler can optionally be defined to take in a lambdaContext parameter. Then, Lambda context can be injected into the recordHandler as shown below.

import {
    BatchProcessor,
    EventType,
    processPartialResponse
} from `@aws-lambda-powertools/batch`;
import { SQSRecord } from "aws-lambda";

const processor = new BatchProcessor({ eventType: EventType.SQS });

const recordHandler = async (record: SQSRecord, lambdaContext?: Context): Promise<void> {
    lambdaContext.getRemaining...()  // access some Lambda context
}

const lambdaHandler = async ( _event: any, _context: LambdaContext): Promise<void> => {
    return processPartialResponse(_event, recordHandler, processor, _context);
}

Alternative solution for accessing lambda context:

Rather than accessing Lambda context directly from a lambdaContext parameter, we can consider defining a BatchProcessingOptions type instead, which will contain the Lambda context instead.

// Option 2
type BatchProcessingOptions = {
  context: Context;
}

const recordHandler = async (record: SQSRecord, options: BatchProcessingOptions) => {
  const { context } = options;
  context.getRemaining...()
}

This typing will allow us to provide additional parameters for the recordHandler in future feature implementations if we need to.

5 Out of Scope

Python has an integration with Pydantic & Event Source Data Classes. For TypeScript, for now, we will rely on types only. This will ensure type safety but not runtime safety/validation. We will revisit this and consider an integration when we address the Parser utility.

6 Other Discussions

6.1 Processor decorator

The processor decorator functionality is now considered legacy in Python, as customers often felt it was too complex, and the processor function was created instead to reduce boilerplate. As a result, for this design, we will skip implementation for the decorator and focus on the processor function from the start.

erikayao93 avatar Jun 27 '23 16:06 erikayao93

⚠️ COMMENT VISIBILITY WARNING ⚠️

Comments on closed issues are hard for our team to see. If you need more assistance, please either tag a team member or open a new issue that references this one. If you wish to keep having a conversation with other community members under this issue feel free to do so.

github-actions[bot] avatar Jul 25 '23 10:07 github-actions[bot]

This is now released under v1.12.1 version!

github-actions[bot] avatar Jul 25 '23 13:07 github-actions[bot]