User description
What type of PR is this?
- [ ] API-change
- [x] BUG
- [ ] Improvement
- [ ] Documentation
- [ ] Feature
- [ ] Test and CI
- [ ] Code Refactoring
Which issue(s) this PR fixes:
issue #22022
What this PR does / why we need it:
bug fix index cache should retry when see index destroyed error.
PR Type
Bug fix
Description
• Fix race condition in vector index cache between NewAlgo and Remove operations
• Replace mutex with condition variable for thread-safe synchronization
• Add LoadAndSearch method to handle concurrent load and search operations
• Add comprehensive test for concurrent search and delete scenarios
Changes walkthrough 📝
| Relevant files |
|---|
| Bug fix |
cache.goReplace mutex with condition variable for thread safety
pkg/vectorindex/cache/cache.go
• Add sync.Cond field to VectorIndexSearch struct for better synchronization • Replace mutex locks with condition variable in Destroy() and Load() methods • Implement new LoadAndSearch() method combining load and search operations • Update Search() method to use condition variable Wait() instead of polling • Modify cache Search() to use LoadAndSearch for new entries
|
+42/-11 |
|
| Tests |
cache_test.goAdd concurrent search and delete test
pkg/vectorindex/cache/cache_test.go
• Add TestCacheConcurrentNewSearchAndDelete test function • Test concurrent search operations with simultaneous cache removal • Verify race condition fix with 2000 search iterations and 4000 remove operations
|
+54/-0 |
|
Need help?
Type /help how to ... in the comments thread for any questions about Qodo Merge usage.Check out the documentation for more information.
PR Reviewer Guide 🔍
Here are some key observations to aid the review process:
| ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪ |
| 🧪 PR contains tests |
| 🔒 No security concerns identified |
⚡ Recommended focus areas for review
Synchronization Bug
The condition variable Cond is initialized with &s.Mutex but the code uses s.Mutex.RLock() in Search method while Cond.Wait() expects the associated mutex to be locked with Lock(), not RLock(). This creates a mismatch between read and write locks that could cause deadlocks or undefined behavior.
s.Mutex.RLock()
defer s.Mutex.RUnlock()
for s.Status.Load() == 0 {
s.Cond.Wait()
}
Race Condition
In the Search method, after calling Cond.Wait(), the code continues without re-checking if the entry was destroyed by another thread. The LoadAndSearch method broadcasts to wake up waiting threads, but there's no guarantee the entry hasn't been removed from the cache between the wait and the subsequent operations.
for s.Status.Load() == 0 {
s.Cond.Wait()
}
status := s.Status.Load()
if status >= STATUS_DESTROYED {
if status == STATUS_DESTROYED {
return nil, nil, moerr.NewInternalErrorNoCtx("Index destroyed")
} else {
Logic Error
The LoadAndSearch method calls Broadcast() after setting status to LOADED, but then immediately calls UpdateConfig which might set status to OUTDATED. This could wake up waiting threads with inconsistent state, and the method doesn't handle the case where UpdateConfig fails properly.
// signal condition
s.Cond.Broadcast()
// if error mark as outdated
err = s.Algo.UpdateConfig(newalgo)
if err != nil {
s.Status.Store(STATUS_OUTDATED)
}
s.extend(false)
return s.Algo.Search(proc, query, rt)
|
PR Code Suggestions ✨
Latest suggestions up to 45de5f3
| Category | Suggestion | Impact |
| Incremental [*] |
Add timeout to prevent deadlock
Add a timeout mechanism to prevent indefinite blocking when waiting for status changes. The current implementation could cause goroutines to wait forever if the status never changes from 0.
pkg/vectorindex/cache/cache.go [126-130]
s.Cond.L.Lock()
defer s.Cond.L.Unlock()
+
+timeout := time.NewTimer(30 * time.Second)
+defer timeout.Stop()
+
for s.Status.Load() == 0 {
- s.Cond.Wait()
+ done := make(chan struct{})
+ go func() {
+ s.Cond.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ // Continue loop to check status again
+ case <-timeout.C:
+ return nil, nil, moerr.NewInternalErrorNoCtx("Timeout waiting for index initialization")
+ }
}
- [ ] Apply / Chat <!-- /improve --apply_suggestion=0 -->
Suggestion importance[1-10]: 7
__
Why: The suggestion correctly identifies a potential issue where a goroutine could wait indefinitely on s.Cond.Wait(). Adding a timeout is a good practice for robustness in concurrent code to prevent stuck goroutines.
| Medium
|
Limit retry attempts to prevent loops
Add a retry limit to prevent infinite loops when the index is repeatedly destroyed. The current implementation could loop indefinitely if the index keeps getting destroyed by concurrent operations.
pkg/vectorindex/cache/cache.go [253-277]
-for {
+const maxRetries = 10
+for retryCount := 0; retryCount < maxRetries; retryCount++ {
s := &VectorIndexSearch{Algo: newalgo}
// use RLocker to let Cond.Wait() to use Rlock() and RUnlock()
s.Cond = sync.NewCond(s.Mutex.RLocker())
value, loaded := c.IndexMap.LoadOrStore(key, s)
- ...
+ algo := value.(*VectorIndexSearch)
+ if !loaded {
+ // load model from database and if error during loading, remove the entry from gIndexMap
+ err := algo.Load(proc)
+ if err != nil {
+ c.IndexMap.Delete(key)
+ return nil, nil, err
+ }
+ }
+ keys, distances, err = algo.Search(proc, newalgo, query, rt)
+ if err != nil {
+ if moerr.IsMoErrCode(err, moerr.ErrInvalidState) {
+ // index destroyed by Remove() or HouseKeeping. Retry!
+ continue
+ }
+ return nil, nil, err
+ }
+
+ return keys, distances, nil
}
+return nil, nil, moerr.NewInternalErrorNoCtx("Max retries exceeded for index search")
[To ensure code accuracy, apply this suggestion manually]
Suggestion importance[1-10]: 7
__
Why: The suggestion correctly points out that the for {} loop could run indefinitely under certain high-contention scenarios. Introducing a retry limit with a backoff is a standard and effective pattern to prevent such infinite loops and improve system stability.
| Medium
|
- [ ] Update <!-- /improve_multi --more_suggestions=true -->
| |
Previous suggestions
Suggestions up to commit 45de5f3
| Category | Suggestion | Impact |
| Incremental [*] |
Add timeout to condition wait
Add a timeout mechanism to prevent indefinite blocking when waiting for status changes. The current implementation could cause goroutines to wait forever if the status never changes from STATUS_NOT_INIT.
pkg/vectorindex/cache/cache.go [126-130]
s.Cond.L.Lock()
defer s.Cond.L.Unlock()
+
+timeout := time.NewTimer(30 * time.Second)
+defer timeout.Stop()
+
for s.Status.Load() == 0 {
- s.Cond.Wait()
+ done := make(chan struct{})
+ go func() {
+ s.Cond.Wait()
+ close(done)
+ }()
+
+ select {
+ case <-done:
+ // Continue the loop to check status again
+ case <-timeout.C:
+ return nil, nil, moerr.NewInternalErrorNoCtx("Timeout waiting for index initialization")
+ }
}
Suggestion importance[1-10]: 7
__
Why: The suggestion correctly identifies a potential for an indefinite wait if the index loading process hangs. Adding a timeout to s.Cond.Wait() is a good defensive programming practice that improves the robustness and resilience of the system by preventing goroutines from blocking forever.
| Medium
|
Limit retry attempts in loop
Add a retry limit to prevent infinite loops when the index is repeatedly destroyed. The current implementation could loop indefinitely if the index keeps getting destroyed by concurrent operations.
pkg/vectorindex/cache/cache.go [253-277]
-for {
+const maxRetries = 10
+for retryCount := 0; retryCount < maxRetries; retryCount++ {
s := &VectorIndexSearch{Algo: newalgo}
// use RLocker to let Cond.Wait() to use Rlock() and RUnlock()
s.Cond = sync.NewCond(s.Mutex.RLocker())
value, loaded := c.IndexMap.LoadOrStore(key, s)
- ...
+ algo := value.(*VectorIndexSearch)
+ if !loaded {
+ // load model from database and if error during loading, remove the entry from gIndexMap
+ err := algo.Load(proc)
+ if err != nil {
+ c.IndexMap.Delete(key)
+ return nil, nil, err
+ }
+ }
+ keys, distances, err = algo.Search(proc, newalgo, query, rt)
+ if err != nil {
+ if moerr.IsMoErrCode(err, moerr.ErrInvalidState) {
+ // index destroyed by Remove() or HouseKeeping. Retry!
+ continue
+ }
+ return nil, nil, err
+ }
+
+ return keys, distances, nil
}
+return nil, nil, moerr.NewInternalErrorNoCtx("Max retries exceeded for index search")
Suggestion importance[1-10]: 7
__
Why: The suggestion correctly points out that the for {} loop for retrying a search could potentially run indefinitely under certain high-contention scenarios. Introducing a retry limit with a backoff mechanism is a standard pattern to prevent such infinite loops and resource starvation, making the system more stable.
| Medium
|
| General |
Add backoff to retry loop
The infinite retry loop without any backoff or maximum retry limit could lead to a busy-wait scenario consuming excessive CPU resources. Consider adding a retry limit or exponential backoff mechanism.
pkg/vectorindex/cache/cache.go [269-272]
if moerr.IsMoErrCode(err, moerr.ErrInvalidState) {
- // index destroyed by Remove() or HouseKeeping. Retry!
+ // index destroyed by Remove() or HouseKeeping. Retry with backoff!
+ time.Sleep(time.Millisecond)
continue
}
Suggestion importance[1-10]: 7
__
Why: The suggestion correctly identifies that the tight retry loop could lead to high CPU usage if an index is frequently destroyed and recreated. Adding a small delay or an exponential backoff is a good practice to prevent busy-waiting and yield CPU resources, improving overall system performance under contention.
| Medium
|
|
| |
✅ Suggestions up to commit 9d19d68
| Category | Suggestion | Impact |
| Possible issue |
✅ Fix condition variable deadlock issue
Suggestion Impact:The suggestion was implemented but with a different approach. Instead of directly replacing RLock with Cond.L.Lock, the code was refactored to use an anonymous function that properly locks the condition variable's mutex before waiting, then uses RLock separately for the rest of the function.
code diff:
+ func() {
+ s.Cond.L.Lock()
+ defer s.Cond.L.Unlock()
+ for s.Status.Load() == 0 {
+ s.Cond.Wait()
+ }
+ }()
+
s.Mutex.RLock()
defer s.Mutex.RUnlock()
The condition variable Wait() is called while holding a read lock, but
sync.Cond.Wait() requires the associated mutex to be locked for writing. This will cause a deadlock since Wait() expects to unlock and relock the mutex, but it's currently read-locked.
pkg/vectorindex/cache/cache.go [146-152]
func (s *VectorIndexSearch) Search(proc *process.Process, newalgo VectorIndexSearchIf, query any, rt vectorindex.RuntimeConfig) (keys any, distances []float64, err error) {
- s.Mutex.RLock()
- defer s.Mutex.RUnlock()
+ s.Cond.L.Lock()
+ defer s.Cond.L.Unlock()
for s.Status.Load() == 0 {
s.Cond.Wait()
}
[Suggestion processed]
Suggestion importance[1-10]: 10
__
Why: The suggestion correctly identifies a critical bug. Calling s.Cond.Wait() while holding a read lock (s.Mutex.RLock()) on the associated RWMutex would cause a panic, as Wait() attempts to call the Unlock() method (for a write lock), not RUnlock(). The proposed fix of using s.Cond.L.Lock() (which corresponds to s.Mutex.Lock()) correctly acquires a write lock, which is required before calling Wait().
| High
|
|
| |