k-NN
k-NN copied to clipboard
[Enhancement] Make Merge in nativeEngine can Abort
Description
When there is a scenarios:
- There is A Merge Task On
Node1#Index1#Shard1(long time running) - After merge task started, begin relocating from
Node1#Index1#Shard1TONode2#Index1#Shard1 - At the finalize step, source need do closeShard, but the merge task would take a long time, stack as following shows.
- The clusterApplierService would wait for about N minutes(long time running), and mark the node stale, and master let node1 left because node1 long time no response.
opensearch[datanode1][clusterApplierService#updateTask][T#1]" #41 daemon prio=5 os_prio=0 cpu=5183.70ms elapsed=93132.85s tid=0x00007f3f392509d0 nid=0x101 in Object.wait() [0x00007f3f6ddfb000]
java.lang.Thread.State: TIMED_WAITING (on object monitor)
at java.lang.Object.wait([email protected]/Native Method)
- waiting on <no object reference available>
at org.apache.lucene.index.IndexWriter.doWait(IndexWriter.java:5410)
- locked <0x0000001022b0abe8> (a org.apache.lucene.index.IndexWriter)
at org.apache.lucene.index.IndexWriter.abortMerges(IndexWriter.java:2721)
- locked <0x0000001022b0abe8> (a org.apache.lucene.index.IndexWriter)
at org.apache.lucene.index.IndexWriter.rollbackInternalNoCommit(IndexWriter.java:2469)
- locked <0x0000001022b0abe8> (a org.apache.lucene.index.IndexWriter)
at org.apache.lucene.index.IndexWriter.rollbackInternal(IndexWriter.java:2449)
- locked <0x0000001022bae6d0> (a java.lang.Object)
at org.apache.lucene.index.IndexWriter.rollback(IndexWriter.java:2441)
at org.opensearch.index.engine.InternalEngine.closeNoLock(InternalEngine.java:2370)
at org.opensearch.index.engine.Engine.close(Engine.java:2000)
at org.opensearch.index.engine.Engine.flushAndClose(Engine.java:1987)
at org.opensearch.index.shard.IndexShard.close(IndexShard.java:1907)
- locked <0x0000001022b07ea0> (a java.lang.Object)
at org.opensearch.index.IndexService.closeShard(IndexService.java:623)
at org.opensearch.index.IndexService.removeShard(IndexService.java:599)
- locked <0x0000001022a976a8> (a org.opensearch.index.IndexService)
at org.opensearch.index.IndexService.close(IndexService.java:374)
- locked <0x0000001022a976a8> (a org.opensearch.index.IndexService)
at org.opensearch.indices.IndicesService.removeIndex(IndicesService.java:993)
at org.opensearch.indices.cluster.IndicesClusterStateService.removeIndices(IndicesClusterStateService.java:446)
at org.opensearch.indices.cluster.IndicesClusterStateService.applyClusterState(IndicesClusterStateService.java:287)
- locked <0x000000100b7da520> (a org.opensearch.indices.cluster.IndicesClusterStateService)
at org.opensearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:606)
at org.opensearch.cluster.service.ClusterApplierService.callClusterStateAppliers(ClusterApplierService.java:593)
Proposal
i think we can introduce abort mechanism for long time merge task meanwhile close shard called.
i think we can introduce KNNMergeHelper class to check if merge aborted. and when build the graph, we can reuse faiss::InterruptCallback which is interrupt callback mechanism to check whether aborted or not
BUT ConcurrentMergeScheduler#MergeThread is a internal class, we can not call this directly. it throws org.apache.lucene.index.ConcurrentMergeScheduler$MergeThread is in unnamed module of loader 'app'
we can added this static method into OpenSearch Core like OneMergeHelper
Related Issues
Resolves #[Issue number to be closed when this PR is merged] #2530
Check List
- [ ] New functionality includes testing.
- [ ] New functionality has been documented.
- [ ] API changes companion pull request created.
- [ ] Commits are signed per the DCO using
--signoff. - [ ] Public documentation issue/PR created.
By submitting this pull request, I confirm that my contribution is made under the terms of the Apache 2.0 license. For more information on following Developer Certificate of Origin and signing off your commits, please check here.
@luyuncheng This is interesting - but can the interrupt callback with faiss be per graph or would it be for all graphs? In other words, would it cancel all graph builds happening on the node instead of just the one for the closed shard.
@luyuncheng thanks for creating the GH issue. and I think we ourselves have seen this problem in couple of places(ref: https://github.com/opensearch-project/OpenSearch/issues/14828, @kotwanikunal created this), and seeing a solution around this problem is really great. I looked through the code and I want to know how this code is even working? Because as per my understanding of the code you are checking if merge is aborted if somehow write fails and then eating up that exception.
Is this PR only to see if the merge is aborted and write to directory fails how we can handle the errors in case shard is not present on the node because it moved? Because if that is the case then in the 2.19 version of k-NN plugin we added the support writing index using IndexInput/Output. So if a rather than checking if merge is aborted or not, can we not see if IndexInput/Output is closed or not?
This is interesting - but can the interrupt callback with faiss be per graph or would it be for all graphs? In other words, would it cancel all graph builds happening on the node instead of just the one for the closed shard.
@jmazanec15 In Sample Code, https://github.com/opensearch-project/k-NN/blob/32e151a0e41b99688ebe219911734bdf58e05f3d/src/main/java/org/apache/lucene/index/KNNMergeHelper.java#L11-L16 which need include in OpenSearch repo, it shows that it only checked for the current merge thread like OS code https://github.com/opensearch-project/OpenSearch/blob/99a9a81da366173b0c2b963b26ea92e15ef34547/server/src/main/java/org/apache/lucene/index/OneMergeHelper.java#L65-L69
I looked through the code and I want to know how this code is even working? Because as per my understanding of the code you are checking if merge is aborted if somehow write fails and then eating up that exception.
@navneet1v i think the call chain is like following shows
- When faiss is building the graph, meanwhile, close shard triggered.
- Lucene would do
IndexWriter#rollbackInternalwhich would not commit current segment, and wait for current process end. - faiss checked abort using
InterruptCallback, and throwFaissException - we need catch
FaissExceptionfrom native code, and throwMergeAbortedExceptionto Lucene
we added the support writing index using IndexInput/Output. So if a rather than checking if merge is aborted or not, can we not see if IndexInput/Output is closed or not
Nice catch, we need to handle FaissException and do close indexinput/output manually, then throw MergeAbortedException to lucene
@luyuncheng right, but in the faiss InterruptCallback: https://github.com/facebookresearch/faiss/blob/657c563604c774461aed0394ae99210713145e03/faiss/impl/AuxIndexStructures.h#L135-L162, it operates globally. So, setting the instance in one thread, will set it in another thread.
So, for instance, here is the interrupt check in HNSW during graph build: https://github.com/facebookresearch/faiss/blob/657c563604c774461aed0394ae99210713145e03/faiss/IndexHNSW.cpp#L178-L180. If the interrupt gets set and another shard is building a graph on the node, wont this shard's graph build fail too?
it operates globally. So, setting the instance in one thread, will set it in another thread.
@jmazanec15 exactly right, AuxIndexStructures is an singleton struct.
So, for instance, here is the interrupt check in HNSW during graph build: https://github.com/facebookresearch/faiss/blob/657c563604c774461aed0394ae99210713145e03/faiss/IndexHNSW.cpp#L178-L180. If the interrupt gets set and another shard is building a graph on the node, wont this shard's graph build fail too?
every different thread call want_interrupt have different return because current thread context is different because we override want_interrupt. also https://github.com/facebookresearch/faiss/blob/657c563604c774461aed0394ae99210713145e03/faiss/IndexHNSW.cpp#L139 interrupt is a local variable, so different thread would not throw exception
so every graph build would go into check if (InterruptCallback::is_interrupted()) but only the current merge thread which aborted would return true.
also, multi thread would get into lock area which would reduce graph build performance, but check_period would help reduce the impacts.
Oh I see (https://github.com/facebookresearch/faiss/blob/657c563604c774461aed0394ae99210713145e03/faiss/impl/AuxIndexStructures.cpp#L223-L229). That makes sense.
Let me take look at this PR. This would also help address: https://github.com/opensearch-project/OpenSearch/issues/8590.
@luyuncheng are you still working on this?
@luyuncheng are you still working on this?
Yes, Sorry for the late. i am trying to find a way bypass the KNNMergeHelper for protect calling into OpenSearch. and trying to find a way for testing
@jmazanec15 i found a way using reflect, it would get Accessable into ConcurrentMergeScheduler.MergeThread.class like following code:
https://github.com/opensearch-project/k-NN/blob/952d3fb6a6ebccdcdab12a37b59a5d6eff9df697/src/main/java/org/apache/lucene/index/KNNMergeHelper.java#L15-L32
and call into reflect as following which using to get org.apache.lucene.index.MergePolicy.OneMerge in org.apache.lucene.index.ConcurrentMergeScheduler.MergeThread
https://github.com/opensearch-project/k-NN/blob/952d3fb6a6ebccdcdab12a37b59a5d6eff9df697/src/main/java/org/apache/lucene/index/LucenePackagePrivateCaller.java#L16-L29
so we can test it without modified OpenSearch Core.
Also i added tests in KNN80DocValuesConsumerTests.java
Hi @luyuncheng Sorry for delay in response. I looked into this PR and I think the solution looks pretty awesome to me. What would it take to make this code ready for merge?
Hi @luyuncheng Sorry for delay in response. I looked into this PR and I think the solution looks pretty awesome to me. What would it take to make this code ready for merge?
@navneet1v , i have talked with @jmazanec15 but he had some concern about it. and i would like to continue make this pr in progress.