kafka-connect-lambda icon indicating copy to clipboard operation
kafka-connect-lambda copied to clipboard

Sync mode should stop task when Lambda function fails

Open Cardell opened this issue 4 years ago • 10 comments

Thanks for a nice plugin!

When using sync mode and the invocation succeeds the response is seen as success even though the Lambda execution failed.

As long as the invocation succeeds, i.e the Lambda system could receive the event, the task just keep going, even though the actual Lambda function is throwing errors.

As I see it, the expected behaviour when the Lambda function fails and when using sync mode is to stop the task altogether so that no more messages are consumed from Kafka. Or at least have it as an option.

//Rickard

Cardell avatar May 18 '20 21:05 Cardell

Here's some log output as well:

connect_1          | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "HTTP/1.1 200 OK[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "Date: Mon, 18 May 2020 20:34:33 GMT[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "Content-Type: application/json[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "Content-Length: 181[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "Connection: keep-alive[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,178] DEBUG http-outgoing-0 << "x-amzn-RequestId: 123-123-123-123c[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "X-Amz-Function-Error: Unhandled[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "x-amzn-Remapped-Content-Length: 0[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "X-Amz-Executed-Version: $LATEST[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "X-Amzn-Trace-Id: root=1-123123123123123;sampled=0[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "[\r][\n]" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,179] DEBUG http-outgoing-0 << "{"errorMessage": "ASDF", "errorType": "ValueError", "stackTrace": ["  File \"/var/task/lambda_function.py\", line 5, in lambda_handler\n    raise ValueError(\"ASDF\")\n"]}" (org.apache.http.wire)
connect_1          | [2020-05-18 20:34:33,181] DEBUG Received successful response: 200, AWS Request ID: 2313123-123123123-123123 (com.amazonaws.request)

so the request itself is successful and returns a 200 response as it should, while the function was failing.

What about a new property, e.g aws.lambda.function.failure.mode={STOP,DROP} so that this behaviour can be configurable?

Cardell avatar May 19 '20 09:05 Cardell

The response payload is non deterministic. X-Amz-Function-Error appears to be the only indicator that the Lambda has not handled the message. Does that sound right?

dylanmei avatar May 19 '20 12:05 dylanmei

I believe the response is deterministic. Just that upon function errors, it gets some little extra in the payload.

I'm not sure about the low levels of the aws client even though I added logs about it :) Here's part of the documentation: https://docs.aws.amazon.com/lambda/latest/dg/python-exceptions.html

So one has to examine the response payload. That's the only indication it sends. Luckily it is already available in the InvocationResponse: https://github.com/Nordstrom/kafka-connect-lambda/blob/master/src/main/java/com/nordstrom/kafka/connect/lambda/LambdaSinkTask.java#L205

which got it from here: https://github.com/Nordstrom/kafka-connect-lambda/blob/master/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java#L63

Cardell avatar May 19 '20 12:05 Cardell

I made some code that should take care of this case: https://github.com/Nordstrom/kafka-connect-lambda/pull/36

RickardCardell avatar May 29 '20 10:05 RickardCardell

Thanks, @RickardCardell.

This document is tells us that Lambda's signal for a function error is the header X-Amz-Function-Error and it gives no guarantees about how the payload will be formatted.

https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html

dylanmei avatar May 29 '20 12:05 dylanmei

Hi

I'm not fully sure what you imply so I may have understood. From the link you added:

If the function returns an error, Lambda indicates this by including a header named X-Amz-Function-Error, and a JSON-formatted response with the error message and other details

So it should be a JSON according to the spec. The code I added also just UTF-8 encodes it. Is there some other formatting that you refer to?

RickardCardell avatar May 29 '20 13:05 RickardCardell

Any Lambda can return a JSON payload with these fields. The signal from AWS that it's actually an error is in the response header.

dylanmei avatar May 29 '20 13:05 dylanmei

Okay, I think I see what you mean. The content of X-Amz-Function-Error ends up in InvokeResult.getFunctionError(). The error message, the json, goes into the response payload: https://github.com/aws/aws-sdk-java/blob/master/aws-java-sdk-lambda/src/main/java/com/amazonaws/services/lambda/model/InvokeResult.java#L40

Here's where it is accessed in the connector: https://github.com/Nordstrom/kafka-connect-lambda/blob/master/src/main/java/com/nordstrom/kafka/connect/lambda/InvocationClient.java#L63

RickardCardell avatar May 29 '20 13:05 RickardCardell

Aha! Thank you for being patient with me. @SgtPepperLHCB or I can carry on any other discussion items in the PR.

dylanmei avatar May 29 '20 13:05 dylanmei

Great! That PR is mostly for proof of concept. I.e that is one way of solving it, so I don't mind if it is completely rewritten or thrown away :)

RickardCardell avatar May 29 '20 14:05 RickardCardell