tugraph-analytics icon indicating copy to clipboard operation
tugraph-analytics copied to clipboard

feat: support vector store.

Open tanghaodong25 opened this issue 2 months ago • 2 comments

What changes were proposed in this pull request?

How was this PR tested?

  • [ ] Tests have Added for the changes
  • [ ] Production environment verified

tanghaodong25 avatar Oct 20 '25 11:10 tanghaodong25

@tanghaodong25 In general, there are many areas that can be improved in the current code. I wrote a sample for reference only. The addition and deletion of luence needs to ensure atomicity. So I maintain an update variable here.

import java.io.IOException;
import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.ScheduledThreadPoolExecutor;
import java.util.concurrent.ThreadFactory;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
import java.util.function.Function;

import org.apache.lucene.analysis.Analyzer;
import org.apache.lucene.analysis.standard.StandardAnalyzer;
import org.apache.lucene.document.Document;
import org.apache.lucene.index.DirectoryReader;
import org.apache.lucene.index.IndexWriter;
import org.apache.lucene.index.IndexWriterConfig;
import org.apache.lucene.index.Term;
import org.apache.lucene.search.IndexSearcher;
import org.apache.lucene.search.Query;
import org.apache.lucene.search.ScoreDoc;
import org.apache.lucene.search.Sort;
import org.apache.lucene.search.TermQuery;
import org.apache.lucene.search.TopDocs;
import org.apache.lucene.search.WildcardQuery;
import org.apache.lucene.store.ByteBuffersDirectory;
import org.apache.lucene.util.QueryBuilder;

/**
 * Lucene Search Engine in-memory version. Common search, sorted search, update, and delete interfaces are encapsulated.
 * Note: All update operations only update the cache. Search interfaces will not observe changes immediately,
 * as an asynchronous thread periodically commits the updates.
 */
public class LuceneSearchEngine<T> {

	private Analyzer analyzer;
	private QueryBuilder parser;
	private ByteBuffersDirectory directory;
	private IndexWriter iwriter;
	private Function<T, Document> buildDoc;
	private AtomicReference<DirectoryReader> reader;
	private AtomicReference<IndexSearcher> searcher;

	private ScheduledThreadPoolExecutor executor;
	private AtomicBoolean update;

	private static final int COMMIT_DELAY = 20;

	/**
	 * Requires providing a conversion method from the entity class to a Document.
	 * For fuzzy matching, indexed fields using {@link org.apache.lucene.document.TextField}
	 * need to be added to the Document.
	 * {@link org.apache.lucene.document.StringField} only supports single-term exact matching;
	 * it is highly recommended to add a unique ID value of this type for delete and update operations.
	 *
	 * @param elements
	 * @param buildDoc
	 * @throws IOException
	 */
	public LuceneSearchEngine(Collection<T> elements, Function<T, Document> buildDoc) throws IOException{
		this.buildDoc = buildDoc;
		directory = new ByteBuffersDirectory();
		analyzer = new StandardAnalyzer();
		parser = new QueryBuilder(analyzer);
		IndexWriterConfig config = new IndexWriterConfig(analyzer);
		this.iwriter = new IndexWriter(directory, config);
		update = new AtomicBoolean(false);
		addElement(elements);
		iwriter.commit();
		reader = new AtomicReference<DirectoryReader>(DirectoryReader.open(directory));
		searcher = new AtomicReference<IndexSearcher>(new IndexSearcher(reader.get()));
		executor = new ScheduledThreadPoolExecutor(1, new PoolThreadFactory("LuceneSearchEngine", true, 1));
		executor.scheduleWithFixedDelay(() ->{
			if(update.compareAndSet(true, false)){
				try {
					iwriter.commit();
				} catch (IOException e) {
					e.printStackTrace();
				}
			}
		}, 0, COMMIT_DELAY, TimeUnit.SECONDS);
	}

	public void addElement(Collection<T> elements){
		try {
			for (T element : elements) {
				Document document = this.buildDoc.apply(element);
				iwriter.addDocument(document);
			}
			update.set(true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	private IndexSearcher chackAndGetIndexSearcher(){
		DirectoryReader directoryReader = reader.get();
		try {
			DirectoryReader newDirectoryReader = DirectoryReader.openIfChanged(directoryReader);
			if(newDirectoryReader != null && reader.compareAndSet(directoryReader, newDirectoryReader)){
				directoryReader = reader.get();
				searcher.set(new IndexSearcher(directoryReader));
			}
		} catch (IOException e) {
			e.printStackTrace();
		}
		return searcher.get();
	}

	/**
	 * The matching pattern depends on the analyzer's parsing result. If it contains Chinese characters,
	 * it is essentially term-by-term matching; do not use it to query fields of type StringField.
	 * @param field The name of the field to match
	 * @param queryText The content to match
	 * @param page The page number, starting from 1
	 * @param pageSize The size of each page
	 * @param sort The sorting parameter, no sorting performed if null
	 * @return List<Document> The matched Document objects
	 *
	 */
	public List<Document> search(String field, String queryText, int page, int pageSize, Sort sort){
		Query query = parser.createPhraseQuery(field, queryText);
		if(query == null){
			return Collections.emptyList();
		}
		IndexSearcher isearcher = chackAndGetIndexSearcher();
		try {
			ScoreDoc startDoc = null;
			TopDocs topDocs = null;
			if(page > 1){
				if(sort != null){
					topDocs = isearcher.search(query, (page - 1) * pageSize, sort);
				}else {
					topDocs = isearcher.search(query, (page - 1) * pageSize);
				}
				if(topDocs.scoreDocs.length != 0){
					startDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
				}
			}
			if(sort != null){
				topDocs = isearcher.searchAfter(startDoc, query, pageSize, sort);
			}else {
				topDocs = isearcher.searchAfter(startDoc, query, pageSize);
			}
			List<Document> result = new ArrayList<Document>(topDocs.scoreDocs.length);
			for(ScoreDoc scoreDocs : topDocs.scoreDocs){
				result.add(isearcher.doc(scoreDocs.doc));
			}
			return result;
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
	}

	public List<Document> wildCardSearch(String field, String queryText, int page, int pageSize, Sort sort){
		IndexSearcher isearcher = chackAndGetIndexSearcher();
		try {
			 Term t1 = new Term(field, queryText);
			 WildcardQuery query = new WildcardQuery(t1);

			ScoreDoc startDoc = null;
			TopDocs topDocs = null;
			if(page > 1){
				if(sort != null){
					topDocs = isearcher.search(query, (page - 1) * pageSize, sort);
				}else {
					topDocs = isearcher.search(query, (page - 1) * pageSize);
				}
				if(topDocs.scoreDocs.length != 0){
					startDoc = topDocs.scoreDocs[topDocs.scoreDocs.length - 1];
				}
			}
			if(sort != null){
				topDocs = isearcher.searchAfter(startDoc, query, pageSize, sort);
			}else {
				topDocs = isearcher.searchAfter(startDoc, query, pageSize);
			}
			List<Document> result = new ArrayList<Document>(topDocs.scoreDocs.length);
			for(ScoreDoc scoreDocs : topDocs.scoreDocs){
				result.add(isearcher.doc(scoreDocs.doc));
			}
			return result;
		} catch (IOException e) {
			e.printStackTrace();
			return null;
		}
	}

	/**
	 * Enforces non-analyzed exact matching to prevent accidental deletion of wrong values.
	 */
	public void deleleElement(String field, String queryText){
		try {
			Query query = new TermQuery(new Term(field, queryText));
			iwriter.deleteDocuments(query);
			update.set(true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}


	/**
	 * Enforces non-analyzed exact matching to prevent accidental update of wrong values.
	 * @param field The name of the field to specify for matching the unique ID.
	 */
	public void updateElement(Collection<T> elements, String field){
		try {
			for (T element : elements) {
				Document document = buildDoc.apply(element);
				Term query = new Term(field, document.get(field));
				iwriter.updateDocument(query, document);
			}
			update.set(true);
		} catch (IOException e) {
			e.printStackTrace();
		}
	}

	public void close(){
		if(reader.get() != null){
			try {
				reader.get().close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if(iwriter != null){
			try {
				iwriter.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if(directory != null){
			try {
				directory.close();
			} catch (IOException e) {
				e.printStackTrace();
			}
		}
		if(executor != null){
			executor.shutdownNow();
		}
	}

	private static class PoolThreadFactory implements ThreadFactory{
		private final ThreadGroup group;
		private final String namePrefix;
		private final Boolean daemon;


		public PoolThreadFactory(String namePrefix, Boolean daemon, int index) {
			SecurityManager s = System.getSecurityManager();
			this.group = (s != null) ? s.getThreadGroup() : Thread.currentThread().getThreadGroup();
			this.namePrefix = String.format("%s-%d", namePrefix, index);
			this.daemon = daemon;
		}
		@Override
		public Thread newThread(Runnable r) {
			Thread t = new Thread(group, r, namePrefix, 0);
			// The newly created thread is initially marked as being a daemon thread if and only if
			// the thread creating it is currently marked as a daemon thread.
			if (this.daemon != null) {
				t.setDaemon(this.daemon.booleanValue());
			}

			if (t.getPriority() != Thread.NORM_PRIORITY) {
				t.setPriority(Thread.NORM_PRIORITY);
			}
			return t;
		}

	}

}

kitalkuyo-gita avatar Oct 24 '25 06:10 kitalkuyo-gita

kitalkuyo-gita

Thanks for the thoughtful feedback! I really appreciate the sample and your guidance on ensuring atomicity for addition and deletion in lucene with the update variable. I’ll revise the code accordingly and push an updated version soon.

tanghaodong25 avatar Nov 06 '25 02:11 tanghaodong25