elasticsearch-action-updatebyquery icon indicating copy to clipboard operation
elasticsearch-action-updatebyquery copied to clipboard

EsRejectedExecutionException received during update by query operation

Open consulthys opened this issue 10 years ago • 10 comments

Occasionally, my update by query requests are failing with the following output:

{
  "ok": true,
  "took": 69,
  "total": 0,
  "updated": 0,
  "indices": [
    {
      "logs_2007": {
        "0": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@29f475cb]"
        },
        "3": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@4f4b0c16]"
        },
        "4": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@41f7d64d]"
        }
      }
    },
    {
      "logs_2008": {}
    },
    {
      "logs_2009": {
        "2": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@37077cdd]"
        }
      }
    },
    {
      "logs_2010": {
        "2": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@72b29869]"
        },
        "3": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@4d7ba774]"
        },
        "4": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@755fd15]"
        }
      }
    },
    {
      "logs_2011": {
        "0": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@36d8e218]"
        }
      }
    },
    {
      "logs_2012": {
        "3": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@5b1dc281]"
        },
        "4": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@69f89cd0]"
        }
      }
    },
    {
      "logs_2013": {}
    },
    {
      "logs_2014": {
        "3": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@85635b4]"
        },
        "4": {
          "error": "EsRejectedExecutionException[rejected execution (queue capacity 50) on org.elasticsearch.action.updatebyquery.TransportShardUpdateByQueryAction$1@270a4455]"
        }
      }
    }
  ]
}

What I've been doing so far is simply to increase the queue_size of the bulk queue, knowing it's neither ideal nor a good idea, since it will only "hide" a problem that is bound to resurface later.

This morning I came across the latest ES blog post on performance considerations during indexing and Michael mentions that when EsRejectedExecutionException are getting thrown, it usually means that the client is sending too many concurrent requests are the same time, which makes sense now that I read it.

The README file mentions that the action.updatebyquery.bulk_size option can be set in the elasticsearch configuration file. However, it would be nice to also mention that the default setting for this option is 1000 and that if someone starts seeing EsRejectedExecutionException in the response, the way to proceed is to set that option to at most the queue_size (defaults to 50) of the bulk queue of his ES install.

There's also a typo to fix. The equal sign below should be a colon since the elasticsearch config file is YAML: action.updatebyquery.bulk_size=2500 should read action.updatebyquery.bulk_size:2500

consulthys avatar Sep 04 '14 07:09 consulthys

@consulthys We are looking to find out if you still have issues, and what EL and plugin version you used. We don't use yet the plugin, but we are actively considering, so some recent updates and how it performs it would be handy.

pentium10 avatar Oct 17 '14 14:10 pentium10

@pentium10 we've added the following setting action.updatebyquery.bulk_size: 50 (i.e. same size as the queue_size of the bulk queue) to the elasticsearch.yml config file and it works "ok" now.

Another way of fixing this issue is to set the queue_size of the bulk queue to 1000 since that's the default bulk size being used by this plugin.

We're using ES 1.3.2 and the version 2.2.0 of this plugin.

consulthys avatar Oct 18 '14 02:10 consulthys

Why would setting the bulk_size to the bulk queue_size help? The former is about the number of documents within a single bulk request, whereas the latter is about the number of bulk requests waiting to be processed.

The performance considerations during indexing blog post linked in the issue description suggests that the client is sending too many concurrent requests. If the client waits for the response before sending another update-by-query request, so that the overall bulk queue size don't become over-capacity, things should get better.

ofavre avatar Nov 11 '14 20:11 ofavre

The plugin works as follows:

  • Upon reception of a new request, the involved shards are listed and the per-shard request is either being processed locally or remotely. All this without involving the bulk thread pool, if executed from a REST HTTP call.
  • A search is performed synchronously, over all segments of the current shard.
  • A first batch is filled with bulk_size documents (1000 by default).
  • The batch is executed within the same thread. (I don't think the bulk items themselves are threaded, it would make little sense.)
  • Upon completion of the batch, if there still are documents to process, a second iteration is queued in the bulk thread pool.
  • The current thread is done with the request.

Only one thread from the bulk thread pool is ever running at a given time for a single update-by-query request. Per-shard iterations are chained and single threaded.

This reading of the code makes me think of another way of saturating the bulk queue: having too many primary shards on a node. If at most one thread at a time is involved by request and by shard, and the clients are performing only a small constant number of request concurrently, then it must be that, on a given node, too many shards are involved in the update-by-query operation.

Do you think this is sensible?

As for why reducing the bulk_size solved your problem, I have no clue. I think that no matter the bulk size, if more that one iteration is required for each shard, the total number of concurrent bulk requests should equal the number of involved primary shards for a given node. Increasing the bulk size should help to reduce the number of subsequent iteration queued on the bulk thread pool queue. Decreasing it could only make the number of needed bulk requests closer to the number of primary shards of a given node.

Could you try extreme values, such as a bulk_size of 1 and a value as large as the total number of updated documents?

ofavre avatar Nov 11 '14 22:11 ofavre

I've integrated your remarks in the readme, thanks! Your advice to lower the bulk_size has been left-out, as I think this is a bad idea performance-wise.

I hope we'll sort this out properly.

ofavre avatar Nov 11 '14 22:11 ofavre

Lowering the bulk_size may help in some scenarii. But I still see no direct relation between the value of this plugin's bulk_size and Elasticsearch's bulk queue_size.

Explanation: Adding one long task to a contented threadpool will occupy one thread for a long time, reducing the throughput of the threadpool for a long time, and possibly accumulating tasks in the queue and causing rejections. By splitting the long task in a smaller task that requeues itself until the total work is achieved, we do not affect much the throughput of the threadpool, and let other tasks be processed. Our long task will occupy one slot in the queue until sufficient resource has been available and it terminates.

ofavre avatar Mar 11 '15 15:03 ofavre

Thanks @ofavre for your inputs. My apologies for not having answered sooner. I'm going to re-ingest all this and provide whatever feedback I come up with.

consulthys avatar Mar 30 '15 14:03 consulthys

@consulthys Could you test with a lower value for bulk_size? Like I previously suggested, 1 and "total hits" are two extreme values that may help to better grab the inner workings. If you don't have an easy recreation to performs the tests, then don't bother. P.S.: I now see those errors in my cluster, after having upgraded ES.

ofavre avatar Mar 31 '15 08:03 ofavre

Updating 20+ million data by using updateBy Query make my node goes down.

  1. I am updating id to matching documents ( 5 m to 50 million records ) by using _update_by_query api.

  2. while update a matching data from ( 5 m - 10 million) record by using update_by_query api, my ES nodes are geting down . Looks like heavy load on machine. We are using 5 nodes.When update reach 20 Million, one of node going down. Note : ( We are using Load balance also ), but still facing this issue .

How to solve this issue? Please suggest or help

#############

Api details:

############

This api update 5 to 30 million records in one request. this there any limit I have to set here?. Alreay I have done this in config file "action.updatebyquery.bulk_size=25000"

UpdateByQueryClient updateByQueryClient = new UpdateByQueryClientWrapper(client);

UpdateByQueryResponse response = updateByQueryClient.prepareUpdateByQuery().setIndices(props.getProperty("index")).setTypes(props.getProperty("type"))

.setTimeout(TimeValue.timeValueHours(24))

.setIncludeBulkResponses(BulkResponseOption.ALL)

.setScript("if (ctx._source.containsKey("segIds") ) { if (ctx._source.segmentIds.contains(idExist) ) { ctx.op = "none" } else { ctx._source. segIds += segObject} } else { ctx._source. segIds = segObject }")

.setScriptParams(scriptParams)

.setQuery(query)

.execute()

.actionGet();

Ticket filed already , some one please help me

https://discuss.elastic.co/t/updatebyqueryresponse-throwing-timeout/29176

Praveen82 avatar Oct 30 '15 14:10 Praveen82

The following variables set in config file

script.disable_dynamic: false action.updatebyquery.bulk_size: 2500

Praveen82 avatar Oct 30 '15 14:10 Praveen82