alpakka
alpakka copied to clipboard
Need to make retry when 429 occurs from ElasticSearch server side.
Versions used
Akka version: 2.6.x
Problem description
I am using akk and akka-stream-alpakka-elasticsearch to insert data into ElasticSearch (V6.3.x). If ElasticSearch server responds http response with code 429 (Too Many Requests /_bulk ), the running stream would be stopped as the failure.
[ElasticsearchSimpleFlowStage$StageLogic(akka://####)] Received error from elastic after having already processed 0 documents. Error: java.lang.RuntimeException: Request failed for POST https://####.us-west-2.es.amazonaws.com/_bulk, got 429 Too Many Requests with body: 429 Too Many Requests /_bulk
Expected Behavior
Need to make retry based on retry policy
Actual Behavior
Just return Future.failed(exception)
and fail the whole stream.
Relevant logs
[ElasticsearchSimpleFlowStage$StageLogic(akka://fcasb)] Received error from elastic after having already processed 0 documents. Error: java.lang.RuntimeException: Request failed for POST https://vpc-global-site-data-lsubqatc27bcxsywnwxcvvrohe.us-west-2.es.amazonaws.com/_bulk, got 429 Too Many Requests with body: 429 Too Many Requests /_bulk
Reproducible Test Case
Please provide a PR with a failing test.
If the issue is more complex or requires configuration, please provide a link to a project that reproduces the issue.
I think we need to handle these errors and make retries to guarantee the upstream and downstream keep running.
ElasticsearchSimpleFlowStage
case HttpResponse(StatusCodes.OK, _, responseEntity, _) =>
Unmarshal(responseEntity)
.to[String]
.map(json =>
responseHandler.invoke((messages, resultsPassthrough, json))
)
case HttpResponse(StatusCodes.TooManyRequests, _, _, _) =>
val json = """{"took":0,"errors":true,"items":[{"update":{"_index":"","_type":"_doc","_id":"","status":429,"error":{"type":"too_many_client","reason":"429 Too Many Requests /_bulk"}}}]}"""
responseHandler.invoke((messages, resultsPassthrough, json))
case HttpResponse(StatusCodes.GatewayTimeout, _, _, _) =>
val json = """{"took":0,"errors":true,"items":[{"update":{"_index":"","_type":"_doc","_id":"","status":504,"error":{"type":"gateway_timeout","reason":"Gateway Timeout /_bulk"}}}]}"""
responseHandler.invoke((messages, resultsPassthrough, json))
case HttpResponse(status, _, responseEntity, _) =>
log.error("")
Unmarshal(responseEntity).to[String].map { body =>
log.error(s"Request failed for POST $uri, got $status with body: $body")
val json = s"""{"took":0,"errors":true,"items":[{"update":{"_index":"","_type":"_doc","_id":"","status":${status},"error":{"type":"${status}","reason":got $status with body: $body}}}]}"""
responseHandler.invoke((messages, resultsPassthrough, json))
}
@ennru Hi Runne, would you please take a look this one?