RFC: New batch processor for new native partial response (SQS, DynamoDB, Kinesis, Aurora Stream(?))
Key information
- RFC PR: (leave this empty)
- Related issue(s), if known:
- Area: (i.e. Tracer, Metrics, Logger, etc.)
- Meet tenets: (Yes/no)
Summary
A new generic batch processing utility, which can process records from SQS, Kinesis Data Streams, and DynamoDB streams, and handle reporting batch failures.
Motivation
With the launch of support for partial batch responses for Lambda/SQS, the event source mapping can now natively handle partial failures in a batch - removing the need for calls to the delete api. This support already exists for Kinesis and DynamoDB streams.
Proposal
TBD , but on the lines of https://github.com/awslabs/aws-lambda-powertools-roadmap/issues/64
Explain the design in enough detail for somebody familiar with Powertools to understand it, and for somebody familiar with the implementation to implement it.
This should get into specifics and corner-cases, and include examples of how the feature is used. Any new terminology should be defined here.
Drawbacks
Why should we not do this?
Do we need additional dependencies? Impact performance/package size?
Rationale and alternatives
- What other designs have been considered? Why not them?
- What is the impact of not doing this?
Unresolved questions
Optional, stash area for topics that need further development e.g. TBD
@machafer will start to look the implementations and the UX.
@machafer, what's the status on this please?
As I recently worked on the code here i'll pick this up !
Have you seen this issue #596 and is it something to consider as part of this RFC?
Regarding the proposition, I'm not sure how you can return this List<String> with the BatchMessageHandlerBuilder. Also, from the documentation, there is already a SQSBatchResponse class with a BatchItemFailure. We should probably not reinvent the wheel again and use this:
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse> {
@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
List<SQSBatchResponse.BatchItemFailure> batchItemFailures = new ArrayList<SQSBatchResponse.BatchItemFailure>();
String messageId = "";
for (SQSEvent.SQSMessage message : sqsEvent.getRecords()) {
try {
//process your message
messageId = message.getMessageId();
} catch (Exception e) {
//Add failed message identifier to the batchItemFailures list
batchItemFailures.add(new SQSBatchResponse.BatchItemFailure(messageId));
}
}
return new SQSBatchResponse(batchItemFailures);
}
}
Same for kinesis/dynamodb: StreamsEventResponse.BatchItemFailure.
:arrow_right: I would definitely use the built-in events (just seen the solution 4.2). But in that case, what will be the developer experience, what will be the added value of the module? Can you elaborate on this?
Regarding your questions:
- Events v4: We need to gather informations internally to understand the status / differences / ETA.
- I agree this module should supersede the old SQS one, that will be marked deprecated here and deleted in v2. Regarding the common code, I don't know if we are talking about 1 class or much more. We can probably live with duplicated code if it's not too big...
Can we also take inspiration from python and see if we can have a similar "experience".
Wondering if we could have something like this (for SQS), using interfaces and default implementations:
public class ProcessSQSMessageBatch implements RequestHandler<SQSEvent, SQSBatchResponse>, BatchProcessor<SQSEvent, SQSBatchResponse> {
@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
// processBatch is implemented as default in the interface and handle exceptions in the processElement function to add the item to the BatchItemFailure list
return this.processBatch(sqsEvent); // we may need to pass context too... ?
}
// this method comes from the BatchProcessor interface, developers need to override the appropriate one
@Override
protected void processItem(SQSMessage message) {
// do some stuff with this item
}
}
With the BatchProcessor:
public interface BatchProcessor<I, O> {
default O processBatch(I input) {
// depending on the input type (SQS/Kinesis/Dynamo/...), create the appropriate response
// browse the list of items
// for each item
try {
processItem(item);
catch (Throwable t) {
// put item in item failure list
}
}
default void processItem(SQSMessage message) {
System.out.println(message.messageId);
}
default void processItem(KinesisEventRecord record) {
System.out.println(record.eventID);
}
default void processItem(DynamodbStreamRecord record) {
System.out.println(record.eventID);
}
}
With this, we could add new streaming services with the Interface defaults without breaking anything.
Thanks @jeromevdl for the considered feedback!
Regarding the proposition, I'm not sure how you can return this List<String>
Good catch. I think the base class / base builder will have to return Object as there is no relationship between the various event-specific batch response times
I've linked the examples and existing partial response types into the RFC. I've also updated the section at the top explaining why we should do this. It's important we get this part right.
I don't know if we are talking about 1 class or much more. We can probably live with duplicated code if it's not too big...
My feeling is how to handle this will come out as part of the implementation, we just have to be attentive to it.
I would definitely use the built-in events
I tend to agree. I don't think there is much value add, and a lot of extra code to maintain, in mapping to another model. Realistically the presented use case - "in some cases you can move message handlers between event sources without changing a type in your code" - is pretty tenuous.
Can we also take inspiration from python and see if we can have a similar "experience".
Looks like another option - I hadn't thought of using interface defaults. Let me have a play with it - i've started hacking around on a branch to try and get a feel for the ergonomics of different options. We've narrowed down the solution space in this discussion already; I will mock up some variants on the branch and we can discuss again :muscle:
I've added a reasonably complete example using extension and a very rough sketch using default impls. I think it would be helpful to jump on a call together in the next week to discuss
- We cannot use abstract classes as it's become too restrictive (we had the feedback once), that's why I proposed interfaces. I never really played with default though... Also with this, developers don't have the hand on the
handleRequestmethod (here). - I like the idea of the generic U, but we could go even further and let the developer have the inner type of the message (what's in the body). They propose it on python. What you suggest here
- I think you overcomplicate things, but I may miss something...
- Ex: Why a builder ? We're not in the SDK 😉
- Also we don't want to let this to the developers, this is actually why we build this module (the response only consists in PartialBatchFailure, they don't need to return anything)...
@Override
protected SQSBatchResponse writeResponse(Iterable<MessageProcessingResult<SQSEvent.SQSMessage>> results) {
// Here we map up the SQS-specific response for the batch based on the success of the individual messages
throw new NotImplementedException();
}
But we have a good basis to discuss on Thursday.
We cannot use abstract classes as it's become too restrictive
This is a pretty strong statement - do you have some more details? I think it's important to know why we're discarding this. The default interface stuff I struggled to make work but if we turn our brains to it we can probably get somewhere. I can't think of a way of extending this that would cause problems with the fairly classic inheritance structure i've used so a good counter example - "if we add feature X, it will break the public interface" would be good.
I like the idea of the generic U, but we could go even further and let the developer have the inner type of the message (what's in the body).
I've got this in both variants now - for SQS, SQSEvent is the batch, SQSEvent.Message is the individual message:
https://github.com/aws-powertools/powertools-lambda-java/blob/6241014095e45801d23501398868e76d5ad01245/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/examples/ExampleBatchRequestHandler.java#L12-L14
https://github.com/aws-powertools/powertools-lambda-java/blob/6241014095e45801d23501398868e76d5ad01245/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/examples/ExampleMessageHandlerBuilder.java#L17-L20
Because of the way the ABC is extended it's easy to provide a map down from the batch to the records per-type - here's the SQS batch handler:
https://github.com/aws-powertools/powertools-lambda-java/blob/6241014095e45801d23501398868e76d5ad01245/powertools-batch/src/main/java/software/amazon/lambda/powertools/batch/SQSBatchRequestHandler.java#L15-L17
I think you overcomplicate things, but I may miss something... - Ex: Why a builder ? We're not in the SDK 😉
This may well be the case! My reasoning is - if we want to add a tuneable , we should be able to do it without breaking the interface.With the existing SQS batch handling impl you can't because the whole implementation is a series of various public overloads taking a huge list of possible params e.g.:
https://github.com/aws-powertools/powertools-lambda-java/blob/9afd274272a54221dac36522d883893df98c5f19/powertools-sqs/src/main/java/software/amazon/lambda/powertools/sqs/SqsUtils.java#L481-L485
I can across this as part of #1183 , where I would've liked to add a "use FIFO batch behaviour" switch but can no longer change the interface (in this case we can avoid it because we can infer we are on a FIFO queue, but that's kind of beside the point).
I'm not confident about this being the right way and am keen to discuss.
Also we don't want to let this to the developers, this is actually why we build this module (the response only consists in PartialBatchFailure, they don't need to return anything)...
This is a small part of the impl I started to flesh out and not a user facing thing - the user's code returns nothing or throws like the examples inline above. Appreciate it's hard to decode intent from a big dump of uncommented PoC code :)
The idempotency library also integrates with the SQS utility; we must retain this functionality also, and it gives another example of an extension point.
If we go down the Builder route, we should implement it in a way that the batch handler is created and configured once with a builder, and then it is invoked during the handleRequest.
Here some sample code of how the interface might look like:
public class SqsExampleWithBuilder implements RequestHandler<SQSEvent, SQSBatchResponse> {
BatchMessageHandler handler;
public SqsExampleWithBuilder(){
handler = new BatchMessageHandler.Builder()
.withSource(SourceType.SQS)
.withFailureHandler(msg -> System.out.println("Whoops: " + msg.getMessageId()))
.withRawMessageHandler(this::processWithIdempotency)
.build();
}
@Override
public SQSBatchResponse handleRequest(SQSEvent sqsEvent, Context context) {
return handler.handle(sqsEvent, context);
}
@Idempotent
@SqsLargeMessage
private void processWithIdempotency(@IdempotencyKey SQSEvent.SQSMessage sqsMessage, Context context) {
}
}
Thanks for the comment @mriccia
It's more verbose than the interfaces, but probably more customizable I admit...
How do you handle the inner content of a message (body deserialization) ?
FailureHandler is optional right (as we need to handle the failure and add items to partialBatchFailure)?
Not sure about the source... can't we guess it instead of asking it?
Otherwise, I kinda like it...
Something like this ?
BatchMessageHandler<SQSEvent> handler = new BatchMessageHandler.Builder(SQSEvent.class)
.withFailureHandler(msg -> System.out.println("Whoops: " + msg.getMessageId()))
.withDeserializedMessageHandler(this::processDeserialized, Basket.class)
.build();
private void processDeserialized(Basket message, Context context) {
}
RFC looks good now. when it comes to testing this, let's verify that it works with messages passed across multiple services, for example:
- SNS -> SQS
- S3 notification -> SNS -> SQS
- etc.
RFC is good now, let's build it!
For the ...->SNS->SQS, users will have to unwrap manually from the SQSMessage...