matrixone icon indicating copy to clipboard operation
matrixone copied to clipboard

HNSW index update with CDC

Open cpegeric opened this issue 7 months ago • 2 comments

User description

What type of PR is this?

  • [ ] API-change
  • [ ] BUG
  • [ ] Improvement
  • [ ] Documentation
  • [x] Feature
  • [ ] Test and CI
  • [ ] Code Refactoring

Which issue(s) this PR fixes:

issue #21835

What this PR does / why we need it:

To update the HNSW index via CDC changes.

The design doc: https://github.com/cpegeric/mo-docs/blob/hnsw_cdc/design/mo/sql/20250501-cpegeric-hnswsync.md


PR Type

Enhancement, Tests


Description

• Implement comprehensive HNSW index CDC (Change Data Capture) synchronization functionality • Add new HnswSync struct and hnswCdcUpdate SQL function for processing CDC updates via multi-threaded operations • Introduce hnswSyncSinker for updating HNSW indexes with CDC changes, supporting both float32 and float64 vector types • Refactor HNSW architecture by replacing HnswBuildIndex and HnswSearchIndex with unified HnswModel structure • Add CDC data structures (VectorIndexCdc, VectorIndexCdcEntry) and operations for insert, update, delete operations • Implement transaction-aware SQL execution with RunTxn function and enhanced error handling • Add comprehensive test suites covering CDC sinker functionality, synchronization operations, and model operations • Integrate CDC task creation into HNSW index creation workflow with automatic cleanup placeholders • Enhance array casting with dimension validation and standardize error message formats across vector operations • Add distributed test cases for HNSW CDC synchronization scenarios including bulk loads and incremental updates


Changes walkthrough 📝

Relevant files
Tests
12 files
hnsw_sinker_test.go
Add comprehensive test suite for HNSW CDC sinker                 

pkg/cdc/hnsw_sinker_test.go

• Comprehensive test suite for HNSW CDC sinker functionality with 692
lines of test code
• Mock implementations for SQL executors and error
handling scenarios
• Test cases covering sinker creation, execution,
error handling, and data processing
• Tests for snapshot and atomic
batch processing with vector data

+692/-0 
sync_test.go
Add test suite for HNSW CDC synchronization                           

pkg/vectorindex/hnsw/sync_test.go

• Test suite for HNSW CDC synchronization with various operation
scenarios
• Tests covering upsert, delete, insert operations with
single and multiple models
• Mock implementations for SQL execution
and streaming operations
• Shuffle testing for concurrent operation
handling

+370/-0 
search_test.go
Enhance HNSW search tests with multi-file support               

pkg/vectorindex/hnsw/search_test.go

• Added mock functions for catalog SQL operations and multi-file
scenarios
• New test helper functions for creating metadata and index
batches
• Enhanced test coverage for search operations with multiple
index files

+106/-0 
model_test.go
Add comprehensive test suite for HnswModel functionality 

pkg/vectorindex/hnsw/model_test.go

• Added comprehensive test suite for HnswModel functionality
• Tests
cover search operations, loading/unloading, add/remove operations, and
SQL generation
• Includes edge case testing for nil model scenarios

Uses mock SQL functions for testing database interactions

+206/-0 
func_hnsw_test.go
Add test cases for HNSW CDC update function                           

pkg/sql/plan/function/func_hnsw_test.go

• Added test cases for hnswCdcUpdate function
• Tests various error
conditions including null arguments and invalid JSON
• Validates
function parameter validation and error handling

+129/-0 
build_test.go
Update HNSW build tests to use new HnswModel structure     

pkg/vectorindex/hnsw/build_test.go

• Updated test code to use HnswModel instead of HnswSearchIndex

Changed function call from NewHnswBuildIndex to NewHnswModelForBuild

Updated struct initialization to use new model type

+5/-5     
types_test.go
Add test cases for vector index CDC functionality               

pkg/vectorindex/types_test.go

• Added test cases for CDC functionality
• Tests Insert, Delete,
Upsert operations and JSON serialization
• Validates CDC data
structure behavior and state management

+63/-0   
sinker_test.go
Update sinker tests for new function signature                     

pkg/cdc/sinker_test.go

• Updated test calls to NewSinker to include the new cnUUID parameter

• Maintains test compatibility with updated function signature

+1/-1     
cdc_test.go
Update CDC test mocks for new sinker signature                     

pkg/frontend/cdc_test.go

• Updated mock NewSinker stub to include cnUUID parameter
• Maintains
test compatibility with updated function signature

+1/-1     
function_id_test.go
Update function ID tests for HNSW CDC function                     

pkg/sql/plan/function/function_id_test.go

• Updated predefined function IDs to include HNSW_CDC_UPDATE

Incremented FUNCTION_END_NUMBER to maintain test consistency

+3/-1     
vector_hnsw_sync.result
Add test results for HNSW CDC synchronization functionality

test/distributed/cases/vector/vector_hnsw_sync.result

• Added test results for HNSW CDC synchronization scenarios
• Covers
empty data, bulk load, and incremental update test cases
• Validates
vector search functionality after CDC operations

+81/-0   
vector_hnsw_sync.sql
Add comprehensive HNSW CDC synchronization test cases       

test/distributed/cases/vector/vector_hnsw_sync.sql

• Added comprehensive test cases for HNSW CDC synchronization
• Tests
PITR and CDC task creation, data operations, and vector searches

Includes scenarios for empty tables, bulk loads, and incremental
updates

+113/-0 
Feature
6 files
sync.go
Implement HNSW index CDC synchronization functionality     

pkg/vectorindex/hnsw/sync.go

• New CDC synchronization functionality for HNSW index updates via SQL
function hnsw_cdc_update()
HnswSync struct for managing CDC
operations with insert, update, delete operations
• Multi-threaded
processing support with concurrent model loading and vector operations

• SQL generation for metadata and storage table updates

+631/-0 
hnsw_sinker.go
Add HNSW CDC sinker for vector index updates                         

pkg/cdc/hnsw_sinker.go

• New hnswSyncSinker implementation for updating HNSW indexes via CDC
changes
• Support for both float32 and float64 vector types with
JSON-based CDC updates
• Transaction-based SQL execution with error
handling and rollback support
• Processing of snapshot and tail data
with atomic batch operations

+573/-0 
func_hnsw.go
Implement HNSW CDC update function for vector index synchronization

pkg/sql/plan/function/func_hnsw.go

• Implemented hnswCdcUpdate function for processing CDC updates

Validates input parameters (database name, table name, dimension, CDC
JSON)
• Calls hnsw.CdcSync to perform the actual synchronization

Includes comprehensive error handling and logging

+77/-0   
util.go
Add CDC task generation for HNSW index synchronization     

pkg/sql/compile/util.go

• Added genCdcHnswIndex function to generate CDC task creation SQL

Creates PITR and CDC task SQL statements for HNSW index
synchronization
• Includes placeholder logic for future CDC task
registration

+38/-0   
list_builtIn.go
Register HNSW CDC update function in built-in functions   

pkg/sql/plan/function/list_builtIn.go

• Added HNSW_CDC_UPDATE function definition to built-in functions list

• Configured function signature with varchar and int32 parameters
returning uint64

+21/-0   
ddl_index_algo.go
Integrate CDC task creation into HNSW index creation         

pkg/sql/compile/ddl_index_algo.go

• Added call to genCdcHnswIndex in vector HNSW index handling

Executes generated CDC SQL statements during index creation

+13/-0   
Enhancement
9 files
model.go
Refactor HNSW model with CDC support and enhanced operations

pkg/vectorindex/hnsw/model.go

• New HnswModel struct replacing HnswBuildIndex with enhanced CDC
support
• Added dirty tracking, atomic length counters, and view mode
support
• Enhanced file operations with checksum validation and
streaming SQL loading
• Methods for concurrent vector operations and
model lifecycle management

+524/-0 
types.go
Add CDC data structures and operations for vector index   

pkg/vectorindex/types.go

• Added CDC-related constants (CDC_INSERT, CDC_UPSERT, CDC_DELETE)

Introduced VectorIndexCdc and VectorIndexCdcEntry structs for CDC
operations
• Added HnswCdcParam struct for CDC parameters

Implemented CDC data manipulation methods (Insert, Upsert, Delete,
ToJson)

+85/-0   
sinker.go
Add HNSW sync sinker support and improve error handling   

pkg/cdc/sinker.go

• Added support for CDCSinkType_HnswSync sink type
• Updated NewSinker
function signature to include cnUUID parameter
• Fixed potential nil
pointer dereference in error handling

+9/-2     
ddl.go
Add placeholder logic for CDC task cleanup in DDL operations

pkg/sql/compile/ddl.go

• Added TODO comments for CDC task cleanup in DropIndex and DropTable
methods
• Placeholder logic for cleaning up CDC tasks when dropping
vector/fulltext indexes

+9/-0     
sqlexec.go
Add transaction-aware SQL execution function                         

pkg/vectorindex/sqlexec/sqlexec.go

• Added RunTxn function for executing SQL operations within
transactions
• Provides transaction-aware SQL execution with proper
context and options setup

+27/-0   
func_cast.go
Enhance array casting with dimension validation                   

pkg/sql/plan/function/func_cast.go

• Enhanced array casting with dimension validation
• Added bypass for
max dimension check when width equals MaxArrayDimension
• Improved
error handling for dimension mismatches

+11/-2   
hnsw.go
Relax table scan validation in HNSW query building             

pkg/sql/plan/hnsw.go

• Commented out table scan validation in buildHnswCreate
• Relaxed
constraints on child node type checking

+6/-4     
cdc_options.go
Add HNSW sync sink type support in CDC options                     

pkg/frontend/cdc_options.go

• Added support for CDCSinkType_HnswSync in CDC options validation

Extended sink type validation to include HNSW sync type

+1/-1     
cdc_exector.go
Update CDC executor to pass CN UUID to sinker                       

pkg/frontend/cdc_exector.go

• Updated NewSinker call to include cnUUID parameter
• Passes
executor's CN UUID to sinker creation

+1/-0     
Code refactoring
2 files
build.go
Refactor HNSW build to use unified model structure             

pkg/vectorindex/hnsw/build.go

• Refactored to use HnswModel instead of HnswBuildIndex for
consistency
• Removed duplicate HnswBuildIndex struct and related
methods
• Updated build operations to work with the new unified model
structure

+11/-182
search.go
Refactor HNSW search to use HnswModel instead of HnswSearchIndex

pkg/vectorindex/hnsw/search.go

• Removed HnswSearchIndex struct and related methods (loadChunk,
LoadIndex, Search)
• Replaced HnswSearchIndex with HnswModel in the
HnswSearch struct
• Refactored LoadMetadata to be a standalone
function and updated field mappings
• Updated LoadIndex method to use
new HnswModel.LoadIndex signature

+8/-141 
Configuration changes
2 files
function_id.go
Add function ID for HNSW CDC update function                         

pkg/sql/plan/function/function_id.go

• Added HNSW_CDC_UPDATE function ID constant
• Updated
FUNCTION_END_NUMBER and function registry mapping

+7/-1     
types.go
Add HNSW sync sink type constant                                                 

pkg/cdc/types.go

• Added CDCSinkType_HnswSync constant for HNSW synchronization sink
type

+4/-3     
Miscellaneous
3 files
vector_hnsw.result
Update vector dimension error message format                         

test/distributed/cases/vector/vector_hnsw.result

• Updated error message format for dimension mismatch from "vector ops
between different dimensions" to "expected vector dimension X !=
actual dimension Y"

+1/-1     
vector_index.result
Update vector index error message format                                 

test/distributed/cases/vector/vector_index.result

• Updated error message format for dimension mismatch to use new
standardized format

+1/-1     
array.result
Update array dimension error message format                           

test/distributed/cases/array/array.result

• Updated error messages for array dimension mismatches to use new
standardized format

+2/-2     

Need help?
  • Type /help how to ... in the comments thread for any questions about Qodo Merge usage.
  • Check out the documentation for more information.
  • cpegeric avatar May 27 '25 10:05 cpegeric

    PR Reviewer Guide 🔍

    Here are some key observations to aid the review process:

    ⏱️ Estimated effort to review: 4 🔵🔵🔵🔵⚪
    🧪 PR contains tests
    🔒 No security concerns identified
    ⚡ Recommended focus areas for review

    Error Handling

    The error handling in the Run method is complex with nested error checking for multi-error scenarios. The logic for detecting rollback errors through error unwrapping may be fragile and could miss edge cases.

    if err != nil {
    	moe, ok := err.(*moerr.Error)
    	if ok {
    		if moe.ErrorCode() == moerr.ErrQueryInterrupted {
    			// skip rollback error
    			//os.Stderr.WriteString("error QueryInterrupted....rollback\n")
    			logutil.Errorf("cdc hnswSyncSinker(%v) parent rollback", s.dbTblInfo)
    		} else {
    			s.SetError(err)
    		}
    	} else if uw, ok := err.(interface{ Unwrap() []error }); ok {
    		rollbackfound := false
    		for _, e := range uw.Unwrap() {
    			//os.Stderr.WriteString(fmt.Sprintf("errors... %v\n", e))
    			moe, ok := e.(*moerr.Error)
    			if ok && moe.ErrorCode() == moerr.ErrQueryInterrupted {
    				rollbackfound = true
    			}
    		}
    
    		//os.Stderr.WriteString(fmt.Sprintf("rollback found %v\n", rollbackfound))
    		if !rollbackfound {
    			s.SetError(err)
    		}
    	} else {
    		s.SetError(err)
    	}
    }
    
    Concurrency Safety

    The checkContains method uses multi-threaded operations to check if CDC entries exist in indexes, but the error handling through channels and the shared midx slice modification could lead to race conditions or missed errors.

    func (s *HnswSync) checkContains(proc *process.Process) (maxcap uint, midx []int, err error) {
    	err_chan := make(chan error, s.tblcfg.ThreadsBuild)
    
    	maxcap = uint(s.tblcfg.IndexCapacity)
    
    	// try to find index cap
    	cdclen := len(s.cdc.Data)
    
    	midx = make([]int, cdclen)
    	// reset idx to -1
    	for i := range midx {
    		midx[i] = -1
    	}
    
    	// find corresponding indexes
    	for i, m := range s.indexes {
    		err = m.LoadIndex(proc, s.idxcfg, s.tblcfg, s.tblcfg.ThreadsBuild, false)
    		if err != nil {
    			return 0, nil, err
    		}
    
    		if maxcap < m.MaxCapacity {
    			maxcap = m.MaxCapacity
    		}
    
    		var wg sync.WaitGroup
    
    		nthread := int(s.tblcfg.ThreadsBuild)
    		for k := 0; k < nthread; k++ {
    			wg.Add(1)
    			go func() {
    				defer wg.Done()
    				for j, row := range s.cdc.Data {
    
    					if j%nthread != k {
    						continue
    					}
    
    					switch row.Type {
    					case vectorindex.CDC_UPSERT, vectorindex.CDC_DELETE:
    						if midx[j] == -1 {
    							found, err := m.Contains(row.PKey)
    							if err != nil {
    								err_chan <- err
    								return
    							}
    							if found {
    								//os.Stderr.WriteString(fmt.Sprintf("searching... found model %d row %d\n", i, j))
    								midx[j] = i
    
    								if row.Type == vectorindex.CDC_UPSERT {
    									s.nupdate.Add(1)
    								} else {
    									s.ndelete.Add(1)
    								}
    							}
    						}
    					}
    
    				}
    			}()
    		}
    
    		wg.Wait()
    		if len(err_chan) > 0 {
    			return 0, nil, <-err_chan
    		}
    
    		m.Unload()
    	}
    
    	return maxcap, midx, nil
    
    Resource Management

    The LoadIndex method creates temporary files and uses file allocation, but the cleanup logic in defer functions may not handle all error scenarios properly, potentially leaving temporary files or resources uncleaned.

    func (idx *HnswModel) LoadIndex(proc *process.Process, idxcfg vectorindex.IndexConfig, tblcfg vectorindex.IndexTableConfig, nthread int64, view bool) error {
    
    	if idx.Index != nil {
    		// index already loaded. ignore
    		return nil
    
    	}
    
    	stream_chan := make(chan executor.Result, 2)
    	error_chan := make(chan error)
    
    	if len(idx.Path) == 0 {
    		// create tempfile for writing
    		fp, err := os.CreateTemp("", "hnsw")
    		if err != nil {
    			return err
    		}
    
    		// load index to memory
    		defer func() {
    			if view {
    				// if view == true, remove the file.  right now view equals to read-only model when search.
    				// since model loads into memory anyway, we can safely remove the file after load.
    				// NOTE: when choose to load with usearch.View() mmap(), we cannot remove this file.
    				// for update, we need this file for Load() and unload().
    				os.Remove(fp.Name())
    			}
    		}()
    
    		err = fallocate.Fallocate(fp, 0, idx.FileSize)
    		if err != nil {
    			fp.Close()
    			return err
    		}
    
    		// run streaming sql
    		sql := fmt.Sprintf("SELECT chunk_id, data from `%s`.`%s` WHERE index_id = '%s'", tblcfg.DbName, tblcfg.IndexTable, idx.Id)
    		go func() {
    			_, err := runSql_streaming(proc, sql, stream_chan, error_chan)
    			if err != nil {
    				error_chan <- err
    				return
    			}
    		}()
    
    		// incremental load from database
    		sql_closed := false
    		for !sql_closed {
    			sql_closed, err = idx.loadChunk(proc, stream_chan, error_chan, fp)
    			if err != nil {
    				fp.Close()
    				return err
    			}
    		}
    
    		idx.Path = fp.Name()
    		fp.Close()
    	}
    

    qodo-code-review[bot] avatar Jun 11 '25 13:06 qodo-code-review[bot]

    PR Code Suggestions ✨

    Explore these optional code suggestions:

    CategorySuggestion                                                                                                                                    Impact
    Possible issue
    Add bounds checking for slice

    The code accesses sqlBuf[sqlBufReserved:] without checking if the slice has
    sufficient length, which could cause a panic. Add bounds checking before slicing
    the buffer.

    pkg/cdc/hnsw_sinker.go [209-255]

     func (s *hnswSyncSinker[T]) Run(ctx context.Context, ar *ActiveRoutine) {
         logutil.Infof("cdc hnswSyncSinker(%v).Run: start", s.dbTblInfo)
         defer func() {
             logutil.Infof("cdc hnswSyncSinker(%v).Run: end", s.dbTblInfo)
         }()
     
         closed := false
         for !closed {
     
             txnbegin := false
             // make sure there is a BEGIN before start transaction
             for !txnbegin {
     
                 select {
                 case <-ctx.Done():
                     return
                 case sqlBuf, ok := <-s.sqlBufSendCh:
                     if !ok {
                         closed = true
                         return
                     }
                     if bytes.Equal(sqlBuf, begin) {
                         txnbegin = true
                     } else if bytes.Equal(sqlBuf, commit) {
                         // pass
                     } else if bytes.Equal(sqlBuf, rollback) {
                         // pass
                     } else if bytes.Equal(sqlBuf, dummy) {
                         // pass
                     } else {
                         func() {
                             newctx, cancel := context.WithTimeout(context.Background(), 12*time.Hour)
                             defer cancel()
                             //os.Stderr.WriteString("Wait for BEGIN but sql. execute anyway\n")
                             opts := executor.Options{}
                             res, err := s.exec.Exec(newctx, string(sqlBuf), opts)
                             if err != nil {
    -                            logutil.Errorf("cdc hnswSyncSinker(%v) send sql failed, err: %v, sql: %s", s.dbTblInfo, err, sqlBuf[sqlBufReserved:])
    +                            sqlForLog := string(sqlBuf)
    +                            if len(sqlBuf) > sqlBufReserved {
    +                                sqlForLog = string(sqlBuf[sqlBufReserved:])
    +                            }
    +                            logutil.Errorf("cdc hnswSyncSinker(%v) send sql failed, err: %v, sql: %s", s.dbTblInfo, err, sqlForLog)
                                 os.Stderr.WriteString(fmt.Sprintf("sql  executor run failed. %s\n", string(sqlBuf)))
                                 os.Stderr.WriteString(fmt.Sprintf("err :%v\n", err))
                                 s.SetError(err)
                             }
                             res.Close()
                         }()
                     }
                 }
             }
    
    • [ ] Apply / Chat <!-- /improve --apply_suggestion=0 -->
    Suggestion importance[1-10]: 7

    __

    Why: The suggestion correctly identifies a potential panic from an out-of-bounds slice access on sqlBuf[sqlBufReserved:]. Although sqlBufReserved is currently 0, making the panic unlikely, this could change. Adding a bounds check is a good defensive programming practice that prevents a potential crash.

    Medium
    Add nil check validation

    The function increments s.last.Len but doesn't handle the case where the
    increment fails or the model becomes invalid. Add validation to ensure the model
    is still valid after incrementing the counter.

    pkg/vectorindex/hnsw/sync.go [545-569]

     func (s *HnswSync) getLastModelAndIncrForSync(proc *process.Process, maxcap uint, mu *sync.Mutex) (*HnswModel, bool, error) {
     
         mu.Lock()
         defer mu.Unlock()
    +
    +    if s.last == nil {
    +        return nil, false, moerr.NewInternalError(proc.Ctx, "last model is nil")
    +    }
     
         full := (s.last.Len.Load() >= int64(s.last.MaxCapacity))
         if full {
             id := s.getModelId()
             // model is already full, create a new model for insert
             newmodel, err := NewHnswModelForBuild(id, s.idxcfg, int(s.tblcfg.ThreadsBuild), maxcap)
             if err != nil {
                 return nil, false, err
             }
             s.indexes = append(s.indexes, newmodel)
             s.last = newmodel
     
         }
         //os.Stderr.WriteString(fmt.Sprintf("getlast model full %v id = %s\n", full, last.Id))
     
         // pre-occupy this model by increment a Len counter and do Add() outside the mutex
         // make sure only one call can get full = true
         idxlen := s.last.Len.Add(1)
         full = (idxlen >= int64(s.last.MaxCapacity))
         return s.last, full, nil
     }
    
    • [ ] Apply / Chat <!-- /improve --apply_suggestion=1 -->
    Suggestion importance[1-10]: 2

    __

    Why: The suggestion adds a defensive nil check for s.last. While the calling code in run() ensures s.last is initialized, this check adds robustness against potential future changes that might violate this assumption. The impact is minor as it addresses a currently non-existent issue.

    Low
    General
    Remove duplicate Load call

    The usearchidx.Load(idx.Path) call is duplicated in both branches. Extract it
    before the conditional to eliminate code duplication and improve
    maintainability.

    pkg/vectorindex/hnsw/model.go [452-458]

    +err = usearchidx.Load(idx.Path)
    +if err != nil {
    +    return err
    +}
    +
     if view {
    -    err = usearchidx.Load(idx.Path)
         idx.View = true
     } else {
    -    err = usearchidx.Load(idx.Path)
         usearchidx.Reserve(uint(tblcfg.IndexCapacity))
     }
    
    • [ ] Apply / Chat <!-- /improve --apply_suggestion=2 -->
    Suggestion importance[1-10]: 6

    __

    Why: The suggestion correctly identifies that usearchidx.Load(idx.Path) is called in both branches of the if/else statement. Refactoring it to be called once before the conditional block improves code readability and maintainability by reducing duplication.

    Low
    Fix misleading error message

    The error message "missing lock service" is misleading since the code is
    checking for InternalSQLExecutor, not a lock service. This could confuse
    debugging efforts when the SQL executor is not available.

    pkg/vectorindex/sqlexec/sqlexec.go [81-83]

     if !ok {
    -    panic("missing lock service")
    +    panic("missing SQL executor service")
     }
    
    • [ ] Apply / Chat <!-- /improve --apply_suggestion=3 -->
    Suggestion importance[1-10]: 6

    __

    Why: The suggestion correctly points out that the panic message "missing lock service" is misleading, as the code is actually checking for an InternalSQLExecutor. Changing it to "missing SQL executor service" improves clarity and aids future debugging.

    Low
    • [ ] Update <!-- /improve_multi --more_suggestions=true -->

    qodo-code-review[bot] avatar Jun 11 '25 13:06 qodo-code-review[bot]