infinity
infinity copied to clipboard
Question: Support for sparse embeddings?
Hi, I was wondering whether is would make sence to support models which, in addition to dense vectors, also support sparse and colbert. For example, BGE-M3 works well under infinity for dense vector retrieval. However, it would require some changes to the inference process to additionally obtain sparse vectors such as shown here: https://github.com/FlagOpen/FlagEmbedding/blob/master/FlagEmbedding/BGE_M3/modeling.py#L352-L355
I wonder if for such case, it's feasible to add extra config parameters in the CLI or that would require too much changes to the core logic of the model during startup?
The most staigtforward way to do this at this moment would be to:
- fork BGE/m3
- Add a “trust_remote_code=True” - ship the code above
- See if the postprocessing (e.g. normalization) influences the new embedding model.
If you end up getting it done - I would love to feature it here! Also if you have further questions, let me know!
I personally think the results from BGE-m3 paper are a bit to hastly - the performance is not good enough for a paradigm change, its more of a experiment. Perhaps its time for a BGE-M3-V2
same question now
Hi @michaelfeil , sorry for the late reply. I actually ended up implementing a very basic and manual version of sparse embeddings for BGE-M3, but it is so slow and occupy so much GPU vram that I just switched to use the simple bm25 in elasticsearch for lexical search instead haha.
Hi, I was wondering whether is would make sence to support models which, in addition to dense vectors, also support sparse and colbert. For example, BGE-M3 works well under infinity for dense vector retrieval. However, it would require some changes to the inference process to additionally obtain sparse vectors such as shown here: https://github.com/FlagOpen/FlagEmbedding/blob/master/FlagEmbedding/BGE_M3/modeling.py#L352-L355
I wonder if for such case, it's feasible to add extra config parameters in the CLI or that would require too much changes to the core logic of the model during startup?
Hi, can support for colbertv2.0 ?
Hi, I was wondering whether is would make sence to support models which, in addition to dense vectors, also support sparse and colbert. For example, BGE-M3 works well under infinity for dense vector retrieval. However, it would require some changes to the inference process to additionally obtain sparse vectors such as shown here: https://github.com/FlagOpen/FlagEmbedding/blob/master/FlagEmbedding/BGE_M3/modeling.py#L352-L355
I wonder if for such case, it's feasible to add extra config parameters in the CLI or that would require too much changes to the core logic of the model during startup?
hi, could you tell me how many times you've accelerated BGE-M3 with Infinity? I'm only seeing a 20% speedup from inifinity when generating dense vectors with BGE-M3, which is far below my expectations.
here is my code
` from embed import BatchedInference from concurrent.futures import Future
class BgeM3InfinityModel(BaseModel):
def init(self):
self.tokenizer = AutoTokenizer.from_pretrained("model/BAAI/bge-m3/main", use_fast=False)
self.register = BatchedInference(
model_id=[
"model/BAAI/bge-m3/main"
],
# engine to torch or optimum
engine="torch",
# device cuda (Nvidia/AMD) or cpu
device="cuda",
)
self.lock = threading.Lock()
def encode(self, texts: List[str], tickets: int = -1) -> (
List[List[float]], List[int], bool, int):
start_time = time.time()
encoded_input = self.tokenizer(texts, padding=True, truncation='longest_first',
return_tensors="pt", max_length=512)
tokens, total_token_num, truncation = self.calc_token(encoded_input['attention_mask'])
if not self.check_token_num(total_token_num, tickets):
raise TokenNumExceedException
token_time = time.time()
with self.lock:
future: "Future" = self.register.embed(
sentences=texts, model_id="model/BAAI/bge-m3/main"
)
emb_result = future.result()
result = []
for item in emb_result[0]:
result.append(item.tolist())
return result, tokens, truncation, int((token_time-start_time) * 1000)
`
@sunzx8 Why are you using threading.Lock? This is harmful for performance & the opposite of how its meant to be.
Please call multiple of these from multiple threads.
future: "Future" = self.register.embed(
sentences=texts, model_id="model/BAAI/bge-m3/main"
)
Defer this until you actually need the embedding.
emb_result = future.result()
@sunzx8 Why are you using threading.Lock? This is harmful for performance & the opposite of how its meant to be.
Please call multiple of these from multiple threads.
future: "Future" = self.register.embed( sentences=texts, model_id="model/BAAI/bge-m3/main" )Defer this until you actually need the embedding.
emb_result = future.result()
Thanks for the quick reply, do you have the code for the best practices in multiple batch cases? I wrote one myself, but found that batch>1 gets stuck
here's my code
` from typing import List, Tuple import time from transformers import AutoTokenizer from models.models import TokenNumExceedException, MODEL_DIM from embed import BatchedInference from concurrent.futures import Future
class BgeM3InfinityModel(): def init(self): self.tokenizer = AutoTokenizer.from_pretrained("/data/upload_files/embedding_worker/model/bge-m3", use_fast=False) self.register = BatchedInference( model_id=["/data/upload_files/embedding_worker/model/bge-m3"], engine="torch", device="cuda", )
def encode(self, texts: List[str], tickets: int = -1) -> Tuple[List[List[float]], List[int], bool, int]:
start_time = time.time()
future: Future = self.register.embed(
sentences=texts,
model_id="/data/upload_files/embedding_worker/model/bge-m3"
)
emb_result = future.result()
self.register.stop()
endtime=time.time()
result = [item.tolist() for item in emb_result[0]]
return result,int((endtime-start_time) * 1000)
def get_name(self):
return 'BAAI/bge-m3/infinity'
def get_dim(self) -> int:
return MODEL_DIM.LARGE.value
from typing import List, Tuple from concurrent.futures import ThreadPoolExecutor import numpy as np from tqdm import tqdm
class ThreadedInference: def init(self, model: BgeM3InfinityModel, num_threads: int = 4, batch_size: int = 32): self.model = model self.num_threads = num_threads self.batch_size = batch_size
def _process_batch(self, texts: List[str]) -> Tuple[List[List[float]], List[int], bool, int]:
"""处理单个批次的文本"""
return self.model.encode(texts)
def process_texts(self, all_texts: List[str]) -> List[List[float]]:
batches = [
all_texts[i:i + self.batch_size]
for i in range(0, len(all_texts), self.batch_size)
]
all_embeddings = []
with ThreadPoolExecutor(max_workers=self.num_threads) as executor:
futures = [
executor.submit(self._process_batch, batch)
for batch in batches
]
for future in tqdm(futures, desc="Processing batches"):
embeddings, time_cost = future.result()
all_embeddings.extend(embeddings)
print("time:",time_cost)
return all_embeddings
def main(): model = BgeM3InfinityModel()
threaded_inference = ThreadedInference(
model=model,
num_threads=4,
batch_size=32
)
texts = ["this page aims to give you an overview of milvus by answering several questions. after reading this page"]
texts=texts*100
# print(texts)
embeddings = threaded_inference.process_texts(texts)
print(f"处理完成,共获得 {len(embeddings)} 个嵌入向量")
if name == "main": main() `
@S1LV3RJ1NX Currently has no priority, as the sparse models are interesting from a research perspective, but their performance is not much better.
- No good models, e.g. performance of https://arxiv.org/abs/2402.03216 m3 bareley improves.
- No "standards" yet / reference implementation that allow for consolidating features in infinity
- No reference Inference API - nobody offers this as commercial product
- Feels like its a push for differentiation from VectorDB's: Use sparse embeddings with A DB - B/C/other VectorDB does not support it.
Ohh, thanks! @michaelfeil . I did hear a lot about BM25 hence asked.
Bm25, as replied in the other thread, is a dataset/database operation + tokenization operation, which has nothing to do with embeddings.