matrixone icon indicating copy to clipboard operation
matrixone copied to clipboard

bug fix index cache return index destroyed error when NewAlgo in one thread and Remove by another thread

Open cpegeric opened this issue 7 months ago • 2 comments

User description

What type of PR is this?

  • [ ] API-change
  • [x] BUG
  • [ ] Improvement
  • [ ] Documentation
  • [ ] Feature
  • [ ] Test and CI
  • [ ] Code Refactoring

Which issue(s) this PR fixes:

issue #22022

What this PR does / why we need it:

bug fix index cache should retry when see index destroyed error.


PR Type

Bug fix


Description

• Fix race condition in vector index cache between NewAlgo and Remove operations • Replace mutex with condition variable for thread-safe synchronization • Add LoadAndSearch method to handle concurrent load and search operations • Add comprehensive test for concurrent search and delete scenarios


Changes walkthrough 📝

Relevant files
Bug fix
cache.go
Replace mutex with condition variable for thread safety   

pkg/vectorindex/cache/cache.go

• Add sync.Cond field to VectorIndexSearch struct for better
synchronization
• Replace mutex locks with condition variable in
Destroy() and Load() methods
• Implement new LoadAndSearch() method
combining load and search operations
• Update Search() method to use
condition variable Wait() instead of polling
• Modify cache Search()
to use LoadAndSearch for new entries

+42/-11 
Tests
cache_test.go
Add concurrent search and delete test                                       

pkg/vectorindex/cache/cache_test.go

• Add TestCacheConcurrentNewSearchAndDelete test function
• Test
concurrent search operations with simultaneous cache removal
• Verify
race condition fix with 2000 search iterations and 4000 remove
operations

+54/-0   

Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • cpegeric avatar Jun 16 '25 16:06 cpegeric

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Synchronization Bug

    The condition variable Cond is initialized with &s.Mutex but the code uses s.Mutex.RLock() in Search method while Cond.Wait() expects the associated mutex to be locked with Lock(), not RLock(). This creates a mismatch between read and write locks that could cause deadlocks or undefined behavior.

    s.Mutex.RLock()
    defer s.Mutex.RUnlock()
    
    for s.Status.Load() == 0 {
    	s.Cond.Wait()
    }
    
    Race Condition

    In the Search method, after calling Cond.Wait(), the code continues without re-checking if the entry was destroyed by another thread. The LoadAndSearch method broadcasts to wake up waiting threads, but there's no guarantee the entry hasn't been removed from the cache between the wait and the subsequent operations.

    for s.Status.Load() == 0 {
    	s.Cond.Wait()
    }
    
    status := s.Status.Load()
    if status >= STATUS_DESTROYED {
    	if status == STATUS_DESTROYED {
    		return nil, nil, moerr.NewInternalErrorNoCtx("Index destroyed")
    	} else {
    
    Logic Error

    The LoadAndSearch method calls Broadcast() after setting status to LOADED, but then immediately calls UpdateConfig which might set status to OUTDATED. This could wake up waiting threads with inconsistent state, and the method doesn't handle the case where UpdateConfig fails properly.

    // signal condition
    s.Cond.Broadcast()
    
    // if error mark as outdated
    err = s.Algo.UpdateConfig(newalgo)
    if err != nil {
    	s.Status.Store(STATUS_OUTDATED)
    }
    
    s.extend(false)
    return s.Algo.Search(proc, query, rt)
    

    qodo-code-review[bot] avatar Jun 16 '25 16:06 qodo-code-review[bot]

    PR Code Suggestions ✨

    Latest suggestions up to 45de5f3

    CategorySuggestion                                                                                                                                    Impact
    Incremental [*]
    Add timeout to prevent deadlock

    Add a timeout mechanism to prevent indefinite blocking when waiting for status
    changes. The current implementation could cause goroutines to wait forever if
    the status never changes from 0.

    pkg/vectorindex/cache/cache.go [126-130]

     s.Cond.L.Lock()
     defer s.Cond.L.Unlock()
    +
    +timeout := time.NewTimer(30 * time.Second)
    +defer timeout.Stop()
    +
     for s.Status.Load() == 0 {
    -    s.Cond.Wait()
    +    done := make(chan struct{})
    +    go func() {
    +        s.Cond.Wait()
    +        close(done)
    +    }()
    +    
    +    select {
    +    case <-done:
    +        // Continue loop to check status again
    +    case <-timeout.C:
    +        return nil, nil, moerr.NewInternalErrorNoCtx("Timeout waiting for index initialization")
    +    }
     }
    
    • [ ] Apply / Chat <!-- /improve --apply_suggestion=0 -->
    Suggestion importance[1-10]: 7

    __

    Why: The suggestion correctly identifies a potential issue where a goroutine could wait indefinitely on s.Cond.Wait(). Adding a timeout is a good practice for robustness in concurrent code to prevent stuck goroutines.

    Medium
    Limit retry attempts to prevent loops

    Add a retry limit to prevent infinite loops when the index is repeatedly
    destroyed. The current implementation could loop indefinitely if the index keeps
    getting destroyed by concurrent operations.

    pkg/vectorindex/cache/cache.go [253-277]

    -for {
    +const maxRetries = 10
    +for retryCount := 0; retryCount < maxRetries; retryCount++ {
         s := &VectorIndexSearch{Algo: newalgo}
         // use RLocker to let Cond.Wait() to use Rlock() and RUnlock()
         s.Cond = sync.NewCond(s.Mutex.RLocker())
         value, loaded := c.IndexMap.LoadOrStore(key, s)
    -    ...
    +    algo := value.(*VectorIndexSearch)
    +    if !loaded {
    +        // load model from database and if error during loading, remove the entry from gIndexMap
    +        err := algo.Load(proc)
    +        if err != nil {
    +            c.IndexMap.Delete(key)
    +            return nil, nil, err
    +        }
    +    }
    +    keys, distances, err = algo.Search(proc, newalgo, query, rt)
    +    if err != nil {
    +        if moerr.IsMoErrCode(err, moerr.ErrInvalidState) {
    +            // index destroyed by Remove() or HouseKeeping.  Retry!
    +            continue
    +        }
    +        return nil, nil, err
    +    }
    +    
    +    return keys, distances, nil
     }
    +return nil, nil, moerr.NewInternalErrorNoCtx("Max retries exceeded for index search")
    

    [To ensure code accuracy, apply this suggestion manually]

    Suggestion importance[1-10]: 7

    __

    Why: The suggestion correctly points out that the for {} loop could run indefinitely under certain high-contention scenarios. Introducing a retry limit with a backoff is a standard and effective pattern to prevent such infinite loops and improve system stability.

    Medium
    • [ ] Update <!-- /improve_multi --more_suggestions=true -->

    Previous suggestions

    Suggestions up to commit 45de5f3
    CategorySuggestion                                                                                                                                    Impact
    Incremental [*]
    Add timeout to condition wait

    Add a timeout mechanism to prevent indefinite blocking when waiting for status
    changes. The current implementation could cause goroutines to wait forever if
    the status never changes from STATUS_NOT_INIT.

    pkg/vectorindex/cache/cache.go [126-130]

     s.Cond.L.Lock()
     defer s.Cond.L.Unlock()
    +
    +timeout := time.NewTimer(30 * time.Second)
    +defer timeout.Stop()
    +
     for s.Status.Load() == 0 {
    -    s.Cond.Wait()
    +    done := make(chan struct{})
    +    go func() {
    +        s.Cond.Wait()
    +        close(done)
    +    }()
    +    
    +    select {
    +    case <-done:
    +        // Continue the loop to check status again
    +    case <-timeout.C:
    +        return nil, nil, moerr.NewInternalErrorNoCtx("Timeout waiting for index initialization")
    +    }
     }
    
    Suggestion importance[1-10]: 7

    __

    Why: The suggestion correctly identifies a potential for an indefinite wait if the index loading process hangs. Adding a timeout to s.Cond.Wait() is a good defensive programming practice that improves the robustness and resilience of the system by preventing goroutines from blocking forever.

    Medium
    Limit retry attempts in loop

    Add a retry limit to prevent infinite loops when the index is repeatedly
    destroyed. The current implementation could loop indefinitely if the index keeps
    getting destroyed by concurrent operations.

    pkg/vectorindex/cache/cache.go [253-277]

    -for {
    +const maxRetries = 10
    +for retryCount := 0; retryCount < maxRetries; retryCount++ {
         s := &VectorIndexSearch{Algo: newalgo}
         // use RLocker to let Cond.Wait() to use Rlock() and RUnlock()
         s.Cond = sync.NewCond(s.Mutex.RLocker())
         value, loaded := c.IndexMap.LoadOrStore(key, s)
    -    ...
    +    algo := value.(*VectorIndexSearch)
    +    if !loaded {
    +        // load model from database and if error during loading, remove the entry from gIndexMap
    +        err := algo.Load(proc)
    +        if err != nil {
    +            c.IndexMap.Delete(key)
    +            return nil, nil, err
    +        }
    +    }
    +    keys, distances, err = algo.Search(proc, newalgo, query, rt)
    +    if err != nil {
    +        if moerr.IsMoErrCode(err, moerr.ErrInvalidState) {
    +            // index destroyed by Remove() or HouseKeeping.  Retry!
    +            continue
    +        }
    +        return nil, nil, err
    +    }
    +    
    +    return keys, distances, nil
     }
    +return nil, nil, moerr.NewInternalErrorNoCtx("Max retries exceeded for index search")
    
    Suggestion importance[1-10]: 7

    __

    Why: The suggestion correctly points out that the for {} loop for retrying a search could potentially run indefinitely under certain high-contention scenarios. Introducing a retry limit with a backoff mechanism is a standard pattern to prevent such infinite loops and resource starvation, making the system more stable.

    Medium
    General
    Add backoff to retry loop

    The infinite retry loop without any backoff or maximum retry limit could lead to
    a busy-wait scenario consuming excessive CPU resources. Consider adding a retry
    limit or exponential backoff mechanism.

    pkg/vectorindex/cache/cache.go [269-272]

     if moerr.IsMoErrCode(err, moerr.ErrInvalidState) {
    -    // index destroyed by Remove() or HouseKeeping.  Retry!
    +    // index destroyed by Remove() or HouseKeeping.  Retry with backoff!
    +    time.Sleep(time.Millisecond)
         continue
     }
    
    Suggestion importance[1-10]: 7

    __

    Why: The suggestion correctly identifies that the tight retry loop could lead to high CPU usage if an index is frequently destroyed and recreated. Adding a small delay or an exponential backoff is a good practice to prevent busy-waiting and yield CPU resources, improving overall system performance under contention.

    Medium
    ✅ Suggestions up to commit 9d19d68
    CategorySuggestion                                                                                                                                    Impact
    Possible issue
    Fix condition variable deadlock issue
    Suggestion Impact:The suggestion was implemented but with a different approach. Instead of directly replacing RLock with Cond.L.Lock, the code was refactored to use an anonymous function that properly locks the condition variable's mutex before waiting, then uses RLock separately for the rest of the function.

    code diff:

    +	func() {
    +		s.Cond.L.Lock()
    +		defer s.Cond.L.Unlock()
    +		for s.Status.Load() == 0 {
    +			s.Cond.Wait()
    +		}
    +	}()
    +
     	s.Mutex.RLock()
     	defer s.Mutex.RUnlock()
    

    The condition variable Wait() is called while holding a read lock, but
    sync.Cond.Wait() requires the associated mutex to be locked for writing. This
    will cause a deadlock since Wait() expects to unlock and relock the mutex, but
    it's currently read-locked.

    pkg/vectorindex/cache/cache.go [146-152]

     func (s *VectorIndexSearch) Search(proc *process.Process, newalgo VectorIndexSearchIf, query any, rt vectorindex.RuntimeConfig) (keys any, distances []float64, err error) {
    -	s.Mutex.RLock()
    -	defer s.Mutex.RUnlock()
    +	s.Cond.L.Lock()
    +	defer s.Cond.L.Unlock()
     
     	for s.Status.Load() == 0 {
     		s.Cond.Wait()
     	}
    

    [Suggestion processed]

    Suggestion importance[1-10]: 10

    __

    Why: The suggestion correctly identifies a critical bug. Calling s.Cond.Wait() while holding a read lock (s.Mutex.RLock()) on the associated RWMutex would cause a panic, as Wait() attempts to call the Unlock() method (for a write lock), not RUnlock(). The proposed fix of using s.Cond.L.Lock() (which corresponds to s.Mutex.Lock()) correctly acquires a write lock, which is required before calling Wait().

    High

    qodo-code-review[bot] avatar Jun 16 '25 16:06 qodo-code-review[bot]