beam icon indicating copy to clipboard operation
beam copied to clipboard

ElasticsearchIO: handle org.elasticsearch.client.ResponseException gracefully

Open andyzhangdialpad opened this issue 1 year ago • 3 comments

Issue

When org.elasticsearch.client.ResponseException is thrown in org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BulkIO.BulkIOBaseFn#flushBatch, it throws as is, which will cause infinite retries in the pipeline.

Example of a org.elasticsearch.client.ResponseException that is not handled gracefully before this change: One document in the batch has document id length of over 512 bytes. https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html

Fix

This change catches this exception. It creates response with the original input documents but setting withHasError(true) and corresponding error message in withResponseItemJson(). Eventually this response will be directed to tag FAILED_WRITES.

andyzhangdialpad avatar May 01 '24 19:05 andyzhangdialpad

cc @egalpin

andyzhangdialpad avatar May 01 '24 19:05 andyzhangdialpad

Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers

github-actions[bot] avatar May 01 '24 20:05 github-actions[bot]

Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:

R: @kennknowles for label java. R: @shunping for label io.

Available commands:

  • stop reviewer notifications - opt out of the automated review tooling
  • remind me after tests pass - tag the comment author after tests pass
  • waiting on author - shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)

The PR bot will only process comments in the main thread (not review comments).

github-actions[bot] avatar May 01 '24 21:05 github-actions[bot]

Reminder, please take a look at this pr: @kennknowles @shunping

github-actions[bot] avatar May 09 '24 12:05 github-actions[bot]

Added test code. Currently test write two docs, the valid one is going to SUCCESSFUL_WRITES and the one with long id is going to FAILED_WRITES.

@egalpin I was trying to force elastic client to flush the whole batch and assert all docs in the batch are directing to FAILED_WRITE.

I've tried a couple different setup (setting withMaxBatchSize and withMaxBatchSizeBytes) but seems like it's still flushing in separate batches. Any tips on how to force all docs into a batch?

andyzhangdialpad avatar May 09 '24 23:05 andyzhangdialpad

@andyzhangdialpad there might be a way to create bundles via primitives for the sake of testing, but the shortest/easiest path in this case might be to try withStatefulBatching. Let's see if that elicits the desired test behaviour that you're after.

egalpin avatar May 10 '24 18:05 egalpin

@egalpin addressed comments. Could you take a look again? Thank you!

andyzhangdialpad avatar May 13 '24 22:05 andyzhangdialpad

Looks like test is failing: https://github.com/apache/beam/pull/31151/checks?check_run_id=24958553387 because org.elasticsearch.client.ResponseException also throws [HTTP/1.1 503 Service Unavailable].

Looks like beam expects this is going to throw exception so that it can retry es client connection.

One way to deal with this is only catching ResponseException and handling 4XX response code.

eg. I believe the long id issue throws ResponseException with status line [HTTP/1.1 400 Bad Request]. We could extract the status code from ResponseException.getStatusLine() and handle 4XX ones.

ResponseException https://github.com/elastic/elasticsearch/blob/main/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java

@egalpin thoughts?

andyzhangdialpad avatar May 14 '24 16:05 andyzhangdialpad

On a second thought, we can probably

  • throw exception for http code is 5xx except 501, I found this approach at https://github.com/hashicorp/go-retryablehttp
  • route to FAILED_WRITE for the rest

andyzhangdialpad avatar May 14 '24 16:05 andyzhangdialpad

Will add 5xx except 501 handling in isRetryableClientException()

andyzhangdialpad avatar May 14 '24 16:05 andyzhangdialpad