Feature request: Event parser envelope (for Kinesis) with sequence number
Use case
We want to handle Kinesis streaming events and use Pydantic models with event_parser, AND be able to do partial failures.
If use KinesisDataStreamEnvelope you just get list of model so you can't do partial failures.
Solution/User Experience
Envelope that would return for example List[Tuple[str, Optional[Model]]] which would allow similar easy handling of the actual model data but also make it possible to easily implement partial batch failures. Similar would probably be useful for other similar envelopes for services that support partial batch failures.
Alternative solutions
Make Powertools batch processing able to work with batches, not single items, in custom code side. And support for failing instantly when first processing fails instead of processing all regardless of errors.
Acknowledgment
- [X] This feature request meets Powertools for AWS Lambda (Python) Tenets
- [ ] Should this be considered in other Powertools for AWS Lambda languages? i.e. Java, TypeScript, and .NET
Thanks for opening your first issue here! We'll come back to you as soon as we can. In the meantime, check out the #python channel on our Powertools for AWS Lambda Discord: Invite link
hey @jarikujansuu thank you for taking the time on filling this feature request - could you expand on how you'd like to use Batch processing with an entire batch?
If you could expand more on the processing experience for Kinesis with any code snippets it'd be super helpful.
While we could fail fast like we do with SqsFifoPartialProcessor, Kinesis can only guarantee ordering within the shard, which means this can create side effects in your processing -- e.g., lowest sequence number will be used as the checkpoint upon multiple failures.
This info will give us a better picture of what you're trying to do, the authoring experience you desire, and how partial failure support would work if your function would receive the entire batch in a single call.
Thanks a lot!
I perhaps had two separate issues in here. My squad is producing data to Kinesis stream and I am checking how we could help our customers to simplify their code, and why they are not using event_parser. And then we ourself mainly consume data from DynamoDb Streams and batch processing from Powertools didn't work for us. Also for our use we only care about ordering within single partition key.
But lets start...
Why event parser should return also sequence number?
- Lambda listening on Kinesis stream will take in batch which contains only events from single shard
- On failure you can return
batchItemFailureswith lowest failed sequence number. If error contains multiple sequence numbers only lowest of them is used
So any processing done after first failure are wasted and will result in duplicates in downstream. I think only possible benefit for processing all even after failing first time is if you are not going to retry anymore (or don't use partial batch failures at all)
But to be able to use partial batch failures at all you can't use event_parser as it removes the required information. Unless you implement custom envelope like I described. I was originally planning to implement such envelope in our own libraries but to me it sounds like something that could be useful for many, and of course would be simpler for us too π
Similar issue with at least DynamoDBStreamEnvelope, any processing done after first failed sequence number is wasted
Why batch processing should be able to pass (smaller) batches to custom code?
So now in batch processing there is record_handler which takes single record at time, so if your lambda takes in batch of 100 events you process them without knowing anything about other events.
We have use case where we take data from DynamoDb Stream and send to EventBus. So if we process events one by one we can send only single event per put_events but if we would process them 10 at time we could send 10 in single call.
But I see it as valid design decision to keep batch processing simple so just process things one event at time and more complex cases could be implemented with help of event_parser
Why batch processing should fail fast with DynamoDb Streams and Kinesis Because Lambda will take first failed sequence number and retry everything after that, there is no possibility of only failing events 2, 7 and 10 because returning those ids would fail 2 and everything after that.
There is of course some caveats. Like if that is your last retry attempt, should you try to save at least some events? We bisect the batch so much and then dlq handling takes care of the rest.
quick note to say I haven't forgotten and will respond today
Thank you so much for clarifying and sharing more details @jarikujansuu - it's clear now.
Splitting into work streams to make it easier to have actions on them, please let me know if I missed or misinterpreted any area.
Fail fast for Streams in Batch Processing
That is something we want to do, it's a valid concern. It's something I wanted to do for a while but haven't prioritized, so having a customer driven need makes this easier -- welcome a PR and to guide you to make this faster, otherwise we can look into after we complete other major work items in early September.
SqsFifoPartialProcessor works that way with one caveat, it reports all unprocessed items as failure to prevent them from being deleted (not the case with Kinesis and DynamoDB due to checkpoint).
[Action] We should create a specialized Batch Processor for Kinesis and one for DynamoDB that stops on first failure by default. We should update documentation to recommend the new one, explain why under Legacy section, and inform customers about Report Batch Item Failure along with Bisect on Error configuration's benefit.
Receive entire batch
As of now, I'm not inclined as it's a trade-off. I concur with the need to optimize networking calls like in your case, but the average person will now have to incur additional boilerplate to handle processing, unless I'm missed something? My reservation with this model is that there is not much a value add, and that's where things go off the rails too quickly as we've seen it first hand (reason why we created this model).
Async Batch Processor wouldn't work here either (all items at once!) because your need is to efficiently call downstream service when you already buffered enough info.
[Action] Feel free to create a RFC. We then wait for customer demand (10+) to validate need and shape the direction. For now, extending batch processor is an alternative solution for savvy customers.
Keeping sequence number in Kinesis Envelope
As of now, I'm not inclined due to existing customers feedback that led us to create the Envelope mechanism. Customers use Envelope to get rid of all metadata and directly map to the DTO or Model they have codified - regardless if they use event_parser or simply parse standalone function.
For Batch, you can access the sequence number in the current model hence why we didn't use an envelope. However, I'm aware it doesn't work for you as you'd want the entire Batch.
[Action] No action needed.
I think fail fast for Streams in Batch Processing would simplify our current client squads code nicely. If I have understood correctly that same SqsFifoPartialProcessor logic could be used for Kinesis and DynamoDb as for them you can include all failed, it just takes the lowest. Of course _short_circuit_processing can easily be changed to return just first failed. π
For our own service ability to use both sequence number and multiple events at same time is kind of must, so probably I just implement such envelope myself as it seems like trivial thing. And if only we need it then sharing code isn't an issue π
Or it might be actually quite simple to extend batch processor to pass for example list of 10 events as Pydantic models, and if that fails send partial failure with all their ids.
Yeah (return first failed immediately) β letβs do the new Kinesis and DynamoDB classes first like FIFO, and then we regroup on a call as Iβd like to see what youβve got. Iβve got the feeling I might be missing something here ;-)
On Thu, 3 Aug 2023 at 15:54, Jari Kujansuu @.***> wrote:
I think fail fast for Streams in Batch Processing would simplify our current client squads code nicely. If I have understood correctly that same SqsFifoPartialProcessor logic could be used for Kinesis and DynamoDb as for them you can include all failed, it just takes the lowest. Of course _short_circuit_processing can easily be changed to return just first failed. π
For our own service ability to use both sequence number and multiple events at same time is kind of must, so probably I just implement such envelope myself as it seems like trivial thing. And if only we need it then sharing code isn't an issue π
Or it might be actually quite simple to extend batch processor to pass for example list of 10 events as Pydantic models, and if that fails send partial failure with all their ids.
β Reply to this email directly, view it on GitHub https://github.com/aws-powertools/powertools-lambda-python/issues/2850#issuecomment-1664027499, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAZPQBCEDAX4GTKCMUKG3ITXTOUQJANCNFSM6AAAAAA2YJCIOA . You are receiving this because you commented.Message ID: @.*** com>
We're adding this in V3, as we were unable to prioritize. If anyone would like to help creating a new class for DynamoDB and one for Kinesis to fail fast like FIFO, please go ahead and we can help out.