[BUG] `delete_by_query` (or `update_by_query`, etc.) with slices creates too many scroll contexts
Describe the bug
I was contacted by a user of Amazon OpenSearch Service who manages a multi-tenant cluster. When a tenant leaves the cluster, their data is deleted from the cluster using a delete_by_query.
In order to complete this in a timely manner, they've tried setting a slice count greater than 1, to let multiple threads drive the deletion. Unfortunately, each slice opens and holds a scroll context on every shard. With something like 36 shards and 10 slices, this ends up being 360 open scroll contexts.
The cluster imposes a default max limit of 500 open scroll contexts, so this is consuming most of them. If two tenants leave at the same time, there aren't enough available scroll contexts to go around.
Related component
Search:Query Capabilities
To Reproduce
- Set up a cluster with (say) 51 shards.
- Add a bunch of data.
- Execute a
delete_by_querywith 10 slices. - Get an error saying
Trying to create too many scroll contexts. Must be less than or equal to: [500]
Expected behavior
We should be able to delete or update a lot of data without needing to open so many scroll contexts.
One option that I've considered is allowing users to run these operations without transactional semantics. Even though a scroll can run shard-by-shard, any scrolled search starts by opening and registering scroll contexts on every shard that will be held until the scroll is done.
Why? Say you're querying data starting at 1pm and it takes you 5 minutes to get through the first shard. you will only see documents published before 1pm on the first shard (since it's using one IndexReader for the whole traversal). If you open a new IndexReader (wrapped in a scroll context) on the second shard, it will have documents published up to 1:05pm. We open all the scroll contexts at once as a way to pretend to be a transactional system. (Of course, the scroll contexts aren't literally created all at once, because network calls take non-zero time, so consistency is an illusion.)
If I want to delete all documents where userid=12345, I don't necessarily care if it means "Delete all documents published prior to this instant (give or take network lag fanning out to shard) with userid=12345" or "Delete all documents where userid=12345 and if new ones get added while you're deleting, maybe delete those too".
So, maybe this is more of a feature request -- I would like delete_by_query to have an argument that disables (approximate) point-in-time consistency. Let it open a scroll on one shard, rip through it, deleting stuff and then open a scroll on the next shard, etc.
Additional Details
Plugins N/A
Screenshots N/A
Host/Environment (please complete the following information):
- OS: [e.g. iOS]
- Version [e.g. 22]
Additional context None
[Search Triage] - @bowenlan-amzn Do you have any thoughts on this?
Note, this is kind of related to https://github.com/opensearch-project/OpenSearch/issues/12923.
I think the underlying problem here is that the coordinator is not slice-aware. All slicing logic happens at the shard level.
When there is a slice that hits a shard outside of the slice range, we just apply a MatchNoDocsQuery as the filter (here and here). Unfortunately, while the associated context will never return anything (since it's filtered to match nothing), we keep the context open.
Ideally, we would move this logic to the coordinator level to decide which shards to target with a particular slice and reduce fan-out.
Note that the existing slicing logic kind-of needs to run at the shard level because the coordinator node doesn't have any awareness of field types and you're allowed to slice based on a specific field.
That said, the main piece of logic is that you have exactly two cases:
-
Number of slices >= number of shards: Shard is covered by a slice if
sliceId % numShards == shard. -
Number of slices < number of shards: Shard is covered by a slice if
shardId % numSlices == sliceId.
We can resolve that logic at the coordinator node and should be able to prevent fanout to the uncovered shards.
The best way to address my previous comment, I think, would be to add an optional SliceBuilder parameter to ClusterSearchShardsRequest. From TransportSearchAction, we can pass the SliceBuilder from the SearchRequest's source. Then it would be up to the cluster manager to resolve the slicing logic in TransportClusterSearchShardsAction. That should handle both the local and cross-cluster search use-cases.
As a bonus, we can confirm the behavior using the REST _search_shards API.
It's not quite as easy as I had hoped, since we also need to capture the logic to handle custom routing for search requests, which may reduce the "effective" number of shards.
I think we need to (carefully) add slice routing logic into the OperationRouting#computeTargetShards method. Of course, in order to replicate the correct logic, we would need to sort the ShardIterators before filtering them.
Maybe we could add the logic to OperationRouting#searchShards to filter the GroupShardsIterator before returning it? (Note that we would not want to filter the searchShards result called from Slicebuilder#buildShardIterator, since that logic relies on getting the full set of shards matching the routing preference.)
So the problem is that _update_by_query cannot (or does not) remove the scroll cursors it used?
I should think that scroll_current should be 0 if an _update_by_query has completed (and cleaned up its cursor resources) but instead the count keeps ticking up for each update until they finally fall of the keep alive cliff.
I'm testing using the simple queries shown below. An index needs to exist obviously but doesn't actually need to have any data for the scroll_current to keep going up after every _update_by_query.
POST my_index/_update_by_query?slices=auto,scroll=1m
GET _nodes/stats/indices/search?filter_path=**.scroll_current
Is this really the expected / correct behavior to leak cursor resources and hope the scroll garbage collection can just keep up?
@btalbot -- that's definitely not the expected behavior. I need to better understand what's going on.
These operations all extend AbstractBulkByScrollAction. When one of those complete (either successfully, with failure, or canceled), it executes this method: https://github.com/opensearch-project/OpenSearch/blob/c0852f84f0647f78a45e71396d6370a39ba4922f/modules/reindex/src/main/java/org/opensearch/index/reindex/AbstractAsyncBulkByScrollAction.java#L544-L566
That call to scrollSource.close should clear the scroll. (See here and here.)
So, if the scrolls aren't getting cleared, something is wrong.
Sorry for the noise. I'm not able to reproduce the scroll leaking any longer. I must have run into some weird corner case or more likely just misunderstood what I was seeing.