beam
beam copied to clipboard
ElasticsearchIO: handle org.elasticsearch.client.ResponseException gracefully
Issue
When org.elasticsearch.client.ResponseException is thrown in org.apache.beam.sdk.io.elasticsearch.ElasticsearchIO.BulkIO.BulkIOBaseFn#flushBatch, it throws as is, which will cause infinite retries in the pipeline.
Example of a org.elasticsearch.client.ResponseException that is not handled gracefully before this change: One document in the batch has document id length of over 512 bytes. https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-id-field.html
Fix
This change catches this exception. It creates response with the original input documents but setting withHasError(true) and corresponding error message in withResponseItemJson(). Eventually this response will be directed to tag FAILED_WRITES.
cc @egalpin
Checks are failing. Will not request review until checks are succeeding. If you'd like to override that behavior, comment assign set of reviewers
Assigning reviewers. If you would like to opt out of this review, comment assign to next reviewer:
R: @kennknowles for label java. R: @shunping for label io.
Available commands:
stop reviewer notifications- opt out of the automated review toolingremind me after tests pass- tag the comment author after tests passwaiting on author- shift the attention set back to the author (any comment or push by the author will return the attention set to the reviewers)
The PR bot will only process comments in the main thread (not review comments).
Reminder, please take a look at this pr: @kennknowles @shunping
Added test code. Currently test write two docs, the valid one is going to SUCCESSFUL_WRITES and the one with long id is going to FAILED_WRITES.
@egalpin I was trying to force elastic client to flush the whole batch and assert all docs in the batch are directing to FAILED_WRITE.
I've tried a couple different setup (setting withMaxBatchSize and withMaxBatchSizeBytes) but seems like it's still flushing in separate batches. Any tips on how to force all docs into a batch?
@andyzhangdialpad there might be a way to create bundles via primitives for the sake of testing, but the shortest/easiest path in this case might be to try withStatefulBatching. Let's see if that elicits the desired test behaviour that you're after.
@egalpin addressed comments. Could you take a look again? Thank you!
Looks like test is failing: https://github.com/apache/beam/pull/31151/checks?check_run_id=24958553387 because org.elasticsearch.client.ResponseException also throws [HTTP/1.1 503 Service Unavailable].
Looks like beam expects this is going to throw exception so that it can retry es client connection.
One way to deal with this is only catching ResponseException and handling 4XX response code.
eg. I believe the long id issue throws ResponseException with status line [HTTP/1.1 400 Bad Request]. We could extract the status code from ResponseException.getStatusLine() and handle 4XX ones.
ResponseException https://github.com/elastic/elasticsearch/blob/main/client/rest/src/main/java/org/elasticsearch/client/ResponseException.java
@egalpin thoughts?
On a second thought, we can probably
- throw exception for http code is 5xx except 501, I found this approach at https://github.com/hashicorp/go-retryablehttp
- route to FAILED_WRITE for the rest
Will add 5xx except 501 handling in isRetryableClientException()