alpakka icon indicating copy to clipboard operation
alpakka copied to clipboard

Need to make retry when 429 occurs from ElasticSearch server side.

Open YouXiang-Wang opened this issue 2 years ago • 2 comments

Versions used

Akka version: 2.6.x

Problem description

I am using akk and akka-stream-alpakka-elasticsearch to insert data into ElasticSearch (V6.3.x). If ElasticSearch server responds http response with code 429 (Too Many Requests /_bulk ), the running stream would be stopped as the failure.

[ElasticsearchSimpleFlowStage$StageLogic(akka://####)] Received error from elastic after having already processed 0 documents. Error: java.lang.RuntimeException: Request failed for POST https://####.us-west-2.es.amazonaws.com/_bulk, got 429 Too Many Requests with body: 429 Too Many Requests /_bulk

Expected Behavior

Need to make retry based on retry policy

Actual Behavior

Just return Future.failed(exception) and fail the whole stream.

Relevant logs

[ElasticsearchSimpleFlowStage$StageLogic(akka://fcasb)] Received error from elastic after having already processed 0 documents. Error: java.lang.RuntimeException: Request failed for POST https://vpc-global-site-data-lsubqatc27bcxsywnwxcvvrohe.us-west-2.es.amazonaws.com/_bulk, got 429 Too Many Requests with body: 429 Too Many Requests /_bulk

Reproducible Test Case

Please provide a PR with a failing test.

If the issue is more complex or requires configuration, please provide a link to a project that reproduces the issue.

YouXiang-Wang avatar Jun 11 '22 01:06 YouXiang-Wang

I think we need to handle these errors and make retries to guarantee the upstream and downstream keep running.

ElasticsearchSimpleFlowStage

            case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
              Unmarshal(responseEntity)
                .to[String]
                .map(json =>
                  responseHandler.invoke((messages, resultsPassthrough, json))
                )

            case HttpResponse(StatusCodes.TooManyRequests, _, _, _) =>
              val json = """{"took":0,"errors":true,"items":[{"update":{"_index":"","_type":"_doc","_id":"","status":429,"error":{"type":"too_many_client","reason":"429 Too Many Requests /_bulk"}}}]}"""
              responseHandler.invoke((messages, resultsPassthrough, json))

            case HttpResponse(StatusCodes.GatewayTimeout, _, _, _) =>
              val json = """{"took":0,"errors":true,"items":[{"update":{"_index":"","_type":"_doc","_id":"","status":504,"error":{"type":"gateway_timeout","reason":"Gateway Timeout /_bulk"}}}]}"""
              responseHandler.invoke((messages, resultsPassthrough, json))

            case HttpResponse(status, _, responseEntity, _) =>
              log.error("")
              Unmarshal(responseEntity).to[String].map { body =>
                log.error(s"Request failed for POST $uri, got $status with body: $body")
                val json = s"""{"took":0,"errors":true,"items":[{"update":{"_index":"","_type":"_doc","_id":"","status":${status},"error":{"type":"${status}","reason":got $status with body: $body}}}]}"""
                responseHandler.invoke((messages, resultsPassthrough, json))
              }

YouXiang-Wang avatar Jun 11 '22 02:06 YouXiang-Wang

@ennru Hi Runne, would you please take a look this one?

YouXiang-Wang avatar Jun 14 '22 06:06 YouXiang-Wang