kafka-connect-lambda
kafka-connect-lambda copied to clipboard
Sync mode should stop task when Lambda function fails
Thanks for a nice plugin!
When using sync mode and the invocation succeeds the response is seen as success even though the Lambda execution failed.
As long as the invocation succeeds, i.e the Lambda system could receive the event, the task just keep going, even though the actual Lambda function is throwing errors.
As I see it, the expected behaviour when the Lambda function fails and when using sync mode is to stop the task altogether so that no more messages are consumed from Kafka. Or at least have it as an option.
//Rickard
Here's some log output as well:
connect_1 | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "HTTP/1.1 200 OK[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "Date: Mon, 18 May 2020 20:34:33 GMT[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "Content-Type: application/json[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "Content-Length: 181[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "Connection: keep-alive[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "x-amzn-RequestId: 123-123-123-123c[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "X-Amz-Function-Error: Unhandled[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "x-amzn-Remapped-Content-Length: 0[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "X-Amz-Executed-Version: $LATEST[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "X-Amzn-Trace-Id: root=1-123123123123123;sampled=0[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "[\r][\n]" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "{"errorMessage": "ASDF", "errorType": "ValueError", "stackTrace": [" File \"/var/task/lambda_function.py\", line 5, in lambda_handler\n raise ValueError(\"ASDF\")\n"]}" (org.apache.http.wire)
connect_1 | [2020-05-18 20:34:33,181] DEBUG Received successful response: 200, AWS Request ID: 2313123-123123123-123123 (com.amazonaws.request)
so the request itself is successful and returns a 200 response as it should, while the function was failing.
What about a new property, e.g aws.lambda.function.failure.mode={STOP,DROP}
so that this behaviour can be configurable?
The response payload is non deterministic. X-Amz-Function-Error
appears to be the only indicator that the Lambda has not handled the message. Does that sound right?
I believe the response is deterministic. Just that upon function errors, it gets some little extra in the payload.
I'm not sure about the low levels of the aws client even though I added logs about it :) Here's part of the documentation: https://docs.aws.amazon.com/lambda/latest/dg/python-exceptions.html
So one has to examine the response payload. That's the only indication it sends. Luckily it is already available in the InvocationResponse: https://github.com/Nordstrom/kafka-connect-lambda/blob/master/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java#L205
which got it from here: https://github.com/Nordstrom/kafka-connect-lambda/blob/master/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java#L63
I made some code that should take care of this case: https://github.com/Nordstrom/kafka-connect-lambda/pull/36
Thanks, @RickardCardell.
This document is tells us that Lambda's signal for a function error is the header X-Amz-Function-Error
and it gives no guarantees about how the payload will be formatted.
https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html
Hi
I'm not fully sure what you imply so I may have understood. From the link you added:
If the function returns an error, Lambda indicates this by including a header named X-Amz-Function-Error, and a JSON-formatted response with the error message and other details
So it should be a JSON according to the spec. The code I added also just UTF-8 encodes it. Is there some other formatting that you refer to?
Any Lambda can return a JSON payload with these fields. The signal from AWS that it's actually an error is in the response header.
Okay, I think I see what you mean.
The content of X-Amz-Function-Error
ends up in InvokeResult.getFunctionError()
.
The error message, the json, goes into the response payload:
https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-lambda/src/main/java/com/amazonaws/services/lambda/model/InvokeResult.java#L40
Here's where it is accessed in the connector: https://github.com/Nordstrom/kafka-connect-lambda/blob/master/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java#L63
Aha! Thank you for being patient with me. @SgtPepperLHCB or I can carry on any other discussion items in the PR.
Great! That PR is mostly for proof of concept. I.e that is one way of solving it, so I don't mind if it is completely rewritten or thrown away :)