powertools-lambda-java icon indicating copy to clipboard operation
powertools-lambda-java copied to clipboard

RFC: New batch processor for new native partial response (SQS, DynamoDB, Kinesis, Aurora Stream(?))

Open pankajagrawal16 opened this issue 3 years ago • 1 comments

Key information

  • RFC PR: (leave this empty)
  • Related issue(s), if known:
  • Area: (i.e. Tracer, Metrics, Logger, etc.)
  • Meet tenets: (Yes/no)

Summary

A new generic batch processing utility, which can process records from SQS, Kinesis Data Streams, and DynamoDB streams, and handle reporting batch failures.

Motivation

With the launch of support for partial batch responses for Lambda/SQS, the event source mapping can now natively handle partial failures in a batch - removing the need for calls to the delete api. This support already exists for Kinesis and DynamoDB streams.

Proposal

TBD , but on the lines of https://github.com/awslabs/aws-lambda-powertools-roadmap/issues/64

Explain the design in enough detail for somebody familiar with Powertools to understand it, and for somebody familiar with the implementation to implement it.

This should get into specifics and corner-cases, and include examples of how the feature is used. Any new terminology should be defined here.

Drawbacks

Why should we not do this?

Do we need additional dependencies? Impact performance/package size?

Rationale and alternatives

  • What other designs have been considered? Why not them?
  • What is the impact of not doing this?

Unresolved questions

Optional, stash area for topics that need further development e.g. TBD

pankajagrawal16 avatar Mar 16 '22 08:03 pankajagrawal16

@machafer will start to look the implementations and the UX.

pankajagrawal16 avatar Mar 16 '22 08:03 pankajagrawal16

@machafer, what's the status on this please?

jeromevdl avatar Nov 16 '22 08:11 jeromevdl

As I recently worked on the code here i'll pick this up !

scottgerring avatar Jun 21 '23 11:06 scottgerring

Have you seen this issue #596 and is it something to consider as part of this RFC?

Regarding the proposition, I'm not sure how you can return this List<String> with the BatchMessageHandlerBuilder. Also, from the documentation, there is already a SQSBatchResponse class with a BatchItemFailure. We should probably not reinvent the wheel again and use this:

public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
    @Override
    public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
 
         List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
         String messageId = "";
         for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
             try {
                 //process your message
                 messageId = message.getMessageId();
             } catch (Exception e) {
                 //Add failed message identifier to the batchItemFailures list
                 batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId));
             }
         }
         return new SQSBatchResponse(batchItemFailures);
     }
}

Same for kinesis/dynamodb: StreamsEventResponse.BatchItemFailure.

:arrow_right: I would definitely use the built-in events (just seen the solution 4.2). But in that case, what will be the developer experience, what will be the added value of the module? Can you elaborate on this?

Regarding your questions:

  • Events v4: We need to gather informations internally to understand the status / differences / ETA.
  • I agree this module should supersede the old SQS one, that will be marked deprecated here and deleted in v2. Regarding the common code, I don't know if we are talking about 1 class or much more. We can probably live with duplicated code if it's not too big...

jeromevdl avatar Jun 22 '23 16:06 jeromevdl

Can we also take inspiration from python and see if we can have a similar "experience".

Wondering if we could have something like this (for SQS), using interfaces and default implementations:

public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse>, BatchProcessor<SQSEvent, SQSBatchResponse> {
     
    @Override
    public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
         // processBatch is implemented as default in the interface and handle exceptions in the processElement function to add the item to the BatchItemFailure list
         return this.processBatch(sqsEvent); // we may need to pass context too... ?
    }

    // this method comes from the BatchProcessor interface, developers need to override the appropriate one
    @Override
    protected void processItem(SQSMessage message) {
        // do some stuff with this item
    }

}

With the BatchProcessor:

public interface BatchProcessor<I, O> {
    default O processBatch(I input) {
        // depending on the input type (SQS/Kinesis/Dynamo/...), create the appropriate response
        // browse the list of items
        // for each item
        try {
            processItem(item);
        catch (Throwable t) {
            // put item in item failure list
        }
    }
    
    default void processItem(SQSMessage message) {
        System.out.println(message.messageId);
    }

    default void processItem(KinesisEventRecord record) {
        System.out.println(record.eventID);
    }

    default void processItem(DynamodbStreamRecord record) {
        System.out.println(record.eventID);
    }
}

With this, we could add new streaming services with the Interface defaults without breaking anything.

jeromevdl avatar Jun 23 '23 07:06 jeromevdl

Thanks @jeromevdl for the considered feedback!

Regarding the proposition, I'm not sure how you can return this List<String>

Good catch. I think the base class / base builder will have to return Object as there is no relationship between the various event-specific batch response times

I've linked the examples and existing partial response types into the RFC. I've also updated the section at the top explaining why we should do this. It's important we get this part right.

I don't know if we are talking about 1 class or much more. We can probably live with duplicated code if it's not too big...

My feeling is how to handle this will come out as part of the implementation, we just have to be attentive to it.

I would definitely use the built-in events

I tend to agree. I don't think there is much value add, and a lot of extra code to maintain, in mapping to another model. Realistically the presented use case - "in some cases you can move message handlers between event sources without changing a type in your code" - is pretty tenuous.

scottgerring avatar Jun 25 '23 07:06 scottgerring

Can we also take inspiration from python and see if we can have a similar "experience".

Looks like another option - I hadn't thought of using interface defaults. Let me have a play with it - i've started hacking around on a branch to try and get a feel for the ergonomics of different options. We've narrowed down the solution space in this discussion already; I will mock up some variants on the branch and we can discuss again :muscle:

scottgerring avatar Jun 25 '23 07:06 scottgerring

I've added a reasonably complete example using extension and a very rough sketch using default impls. I think it would be helpful to jump on a call together in the next week to discuss

scottgerring avatar Jun 25 '23 13:06 scottgerring

  • We cannot use abstract classes as it's become too restrictive (we had the feedback once), that's why I proposed interfaces. I never really played with default though... Also with this, developers don't have the hand on the handleRequest method (here).
  • I like the idea of the generic U, but we could go even further and let the developer have the inner type of the message (what's in the body). They propose it on python. What you suggest here
  • I think you overcomplicate things, but I may miss something...
    • Ex: Why a builder ? We're not in the SDK 😉
    • Also we don't want to let this to the developers, this is actually why we build this module (the response only consists in PartialBatchFailure, they don't need to return anything)...
@Override
    protected SQSBatchResponse writeResponse(Iterable<MessageProcessingResult<SQSEvent.SQSMessage>> results) {
        // Here we map up the SQS-specific response for the batch based on the success of the individual messages
        throw new NotImplementedException();
    }

But we have a good basis to discuss on Thursday.

jeromevdl avatar Jun 27 '23 13:06 jeromevdl

We cannot use abstract classes as it's become too restrictive

This is a pretty strong statement - do you have some more details? I think it's important to know why we're discarding this. The default interface stuff I struggled to make work but if we turn our brains to it we can probably get somewhere. I can't think of a way of extending this that would cause problems with the fairly classic inheritance structure i've used so a good counter example - "if we add feature X, it will break the public interface" would be good.

I like the idea of the generic U, but we could go even further and let the developer have the inner type of the message (what's in the body).

I've got this in both variants now - for SQS, SQSEvent is the batch, SQSEvent.Message is the individual message:

https://github.com/aws-powertools/powertools-lambda-java/blob/6241014095e45801d23501398868e76d5ad01245/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/examples/ExampleBatchRequestHandler.java#L12-L14

https://github.com/aws-powertools/powertools-lambda-java/blob/6241014095e45801d23501398868e76d5ad01245/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/examples/ExampleMessageHandlerBuilder.java#L17-L20

Because of the way the ABC is extended it's easy to provide a map down from the batch to the records per-type - here's the SQS batch handler:

https://github.com/aws-powertools/powertools-lambda-java/blob/6241014095e45801d23501398868e76d5ad01245/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/SQSBatchRequestHandler.java#L15-L17

I think you overcomplicate things, but I may miss something... - Ex: Why a builder ? We're not in the SDK 😉

This may well be the case! My reasoning is - if we want to add a tuneable , we should be able to do it without breaking the interface.With the existing SQS batch handling impl you can't because the whole implementation is a series of various public overloads taking a huge list of possible params e.g.:

https://github.com/aws-powertools/powertools-lambda-java/blob/9afd274272a54221dac36522d883893df98c5f19/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java#L481-L485

I can across this as part of #1183 , where I would've liked to add a "use FIFO batch behaviour" switch but can no longer change the interface (in this case we can avoid it because we can infer we are on a FIFO queue, but that's kind of beside the point).

I'm not confident about this being the right way and am keen to discuss.

Also we don't want to let this to the developers, this is actually why we build this module (the response only consists in PartialBatchFailure, they don't need to return anything)...

This is a small part of the impl I started to flesh out and not a user facing thing - the user's code returns nothing or throws like the examples inline above. Appreciate it's hard to decode intent from a big dump of uncommented PoC code :)

scottgerring avatar Jun 28 '23 06:06 scottgerring

The idempotency library also integrates with the SQS utility; we must retain this functionality also, and it gives another example of an extension point.

scottgerring avatar Jun 28 '23 15:06 scottgerring

If we go down the Builder route, we should implement it in a way that the batch handler is created and configured once with a builder, and then it is invoked during the handleRequest. Here some sample code of how the interface might look like:

public class SqsExampleWithBuilder implements RequestHandler<SQSEvent, SQSBatchResponse> {

    BatchMessageHandler handler;

    public SqsExampleWithBuilder(){
        handler = new BatchMessageHandler.Builder()
                .withSource(SourceType.SQS)
                .withFailureHandler(msg -> System.out.println("Whoops: " + msg.getMessageId()))
                .withRawMessageHandler(this::processWithIdempotency)
                .build();
    }

    @Override
    public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
        return handler.handle(sqsEvent, context);

    }

    @Idempotent
    @SqsLargeMessage
    private void processWithIdempotency(@IdempotencyKey SQSEvent.SQSMessage sqsMessage, Context context) {
    }

}

mriccia avatar Jul 12 '23 14:07 mriccia

Thanks for the comment @mriccia

It's more verbose than the interfaces, but probably more customizable I admit...

How do you handle the inner content of a message (body deserialization) ?

FailureHandler is optional right (as we need to handle the failure and add items to partialBatchFailure)?

Not sure about the source... can't we guess it instead of asking it?

Otherwise, I kinda like it...

Something like this ?

 BatchMessageHandler<SQSEvent>  handler = new BatchMessageHandler.Builder(SQSEvent.class)
                .withFailureHandler(msg -> System.out.println("Whoops: " + msg.getMessageId()))
                .withDeserializedMessageHandler(this::processDeserialized, Basket.class)
                .build();

    private void processDeserialized(Basket message, Context context) {
    }

jeromevdl avatar Jul 12 '23 15:07 jeromevdl

RFC looks good now. when it comes to testing this, let's verify that it works with messages passed across multiple services, for example:

  • SNS -> SQS
  • S3 notification -> SNS -> SQS
  • etc.

mriccia avatar Jul 19 '23 08:07 mriccia

RFC is good now, let's build it!

For the ...->SNS->SQS, users will have to unwrap manually from the SQSMessage...

jeromevdl avatar Jul 19 '23 08:07 jeromevdl