langchain
langchain copied to clipboard
Implementing Hybrid Search (RRF) in LangChain Using OpenSearch Vector Store
Feature request
The feature request I am proposing involves the implementation of hybrid search, specifically using the Reciprocal Rank Fusion (RRF) method, in LangChain through the integration of OpenSearch's vector store. This would enable the combination of keyword and similarity search. Currently, LangChain doesn't appear to support this functionality, even though OpenSearch has had this capability since its 2.10 release. The goal is to allow LangChain to call search pipelines using OpenSearch's vector implementation, enabling OpenSearch to handle the complexities of hybrid search.
Relevant Links: https://opensearch.org/docs/latest/query-dsl/compound/hybrid
Motivation
The motivation behind this request stems from the current limitation in LangChain regarding hybrid search capabilities. As someone working on a search project currently, I find it frustrating that despite OpenSearch supporting hybrid search since version 2.10, LangChain has not yet integrated this feature.
Your contribution
I would gladly help as long as I get guidance..
🤖
Hi @younes-io,
Great to see you again! I hope your project is coming along well. I'm currently reviewing your new issue about implementing hybrid search in LangChain with OpenSearch's vector store. I'll get back to you with a more comprehensive response soon. Thanks for your patience!
🤖
Hello again @younes-io! It's good to see you back and thanks for bringing up another interesting feature request.
Thank you for your feature request. I understand that you're looking to implement the Reciprocal Rank Fusion (RRF) method for hybrid search in LangChain using OpenSearch's vector store. This is indeed a valuable feature that could enhance the capabilities of LangChain.
To implement this, you would need to create a new retriever that uses OpenSearch's vector store for retrieving documents. This new retriever would then be used as one of the retrievers in the EnsembleRetriever. Here's a simplified example of how you might create a new retriever for OpenSearch:
from langchain.schema import BaseRetriever, Document
class OpenSearchRetriever(BaseRetriever):
"""Retriever that uses OpenSearch's vector store for retrieving documents."""
def _get_relevant_documents(
self,
query: str,
*,
run_manager: CallbackManagerForRetrieverRun,
) -> List[Document]:
"""
Get the relevant documents for a given query using OpenSearch's vector store.
Args:
query: The query to search for.
Returns:
A list of relevant documents.
"""
# Use OpenSearch's vector store to get relevant documents
# This is a placeholder and should be replaced with actual code
documents = opensearch_vector_store_search(query)
return documents
Once you have the new retriever, you can use it in the EnsembleRetriever like this:
ensemble_retriever = EnsembleRetriever(
retrievers=[opensearch_retriever, other_retriever],
weights=[0.5, 0.5],
)
This will use the RRF method to combine the results from the OpenSearch retriever and the other retriever.
Please note that this is a simplified example and the actual implementation may require additional steps, such as handling asynchronous operations and integrating with the rest of the LangChain framework.
As for your question about the opensearch-py
function for searching in a pipeline, I'm afraid I don't have the information at the moment. However, you might want to check the OpenSearch documentation or their community for more details.
I hope this helps! If you have any further questions or need more guidance, feel free to ask.
Sources
- Add support for OpenSearch Vector database
- libs/langchain/langchain/retrievers/ensemble.py
- libs/langchain/langchain/retrievers/init.py
This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.
Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.
If you want to continue the conversation, start your reply with @dosu-bot.
Hi @younes-io may i know if this feature is available now for hybrid search in langchain?
+1 from me, it's a shame this exists on the elasticsearch store but not opensearch!
closing as langchain doesnt seem to care x)
So ironically, I've since discovered this actually works if you just copy the ElasticSearch implementation in OpenSearch.... see example below. @younes-io
opensearch_hybrid_retriever = vectorstore.as_retriever(
k='5',
strategy=ElasticsearchStore.ApproxRetrievalStrategy(
hybrid=True))
closing as langchain doesnt seem to care x)
Why are you closing this if this is not fixed? Could you please keep it open and let the contributors pick it up!
Do people have thoughts around if the ElasticSearhc import above works, and if so, should we add to docs? Happy to write the PR, but aware that it's slightly messy to import an ElasticSearch method into the Opensearch vectorstore.
@AndreasThinks : Thanks for the tip! I ended up doing similarity only. BTW, I'm not sure if hybrid search will work even if LangChain implements it. From what I read in OpenSearch docs a few months ago, you need to create a pipeline and configure it specifically to do hybrid search.. it's not systematic. @ksarang90 : I don't think it's useful to open issues, and, for months, no feedback from contributors. I have seen many issues on LangChain like this.. People keep asking questions and it stays there..
So ironically, I've since discovered this actually works if you just copy the ElasticSearch implementation in OpenSearch.... see example below. @younes-io
opensearch_hybrid_retriever = vectorstore.as_retriever( k='5', strategy=ElasticsearchStore.ApproxRetrievalStrategy( hybrid=True))
opensearch vectorstore does not have strategy field, https://github.com/langchain-ai/langchain/blob/master/libs/community/langchain_community/vectorstores/opensearch_vector_search.py#L309, like elasticsearch vectorstore, https://github.com/langchain-ai/langchain/blob/7d216ad1e1fd26ef66e0a037b011ea52f3aadc78/libs/community/langchain_community/vectorstores/elasticsearch.py#L529. This will not work, even though code is not breaking.
class CustomVectorStoreRetriever(VectorStoreRetriever):
customized_search_type: Optional[str]
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
self.customized_search_type = kwargs.get("customized_search_type", None)
allowed_search_types: ClassVar[Collection[str]] = (
"similarity",
"similarity_score_threshold",
"mmr",
# new additions in customizations
"hybrid",
"bm25",
)
@root_validator()
def validate_search_type(cls, values: Dict) -> Dict:
"""Validate search type."""
search_type = values["search_type"]
if search_type not in cls.allowed_search_types:
raise ValueError(
f"search_type of {search_type} not allowed. Valid values are: "
f"{cls.allowed_search_types}"
)
if search_type == "similarity_score_threshold":
score_threshold = values["search_kwargs"].get("score_threshold")
if (score_threshold is None) or (not isinstance(score_threshold, float)):
raise ValueError(
"`score_threshold` is not specified with a float value(0~1) "
"in `search_kwargs`."
)
return values
def _get_relevant_documents(
self, query: str, *, run_manager: CallbackManagerForRetrieverRun
) -> List[Document]:
if self.search_type == "similarity":
docs = self.vectorstore.similarity_search(query, **self.search_kwargs)
elif self.search_type == "similarity_score_threshold":
docs_and_similarities = (
self.vectorstore.similarity_search_with_relevance_scores(
query, **self.search_kwargs
)
)
docs = [doc for doc, _ in docs_and_similarities]
elif self.search_type == "mmr":
docs = self.vectorstore.max_marginal_relevance_search(
query, **self.search_kwargs
)
elif self.search_type == "hybrid":
docs = []
elif self.search_type == "bm25":
docs = []
else:
raise ValueError(f"search_type of {self.search_type} not allowed.")
return docs
async def _aget_relevant_documents(
self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
) -> List[Document]:
print("search kwargs")
print(self.search_kwargs)
if self.customized_search_type:
# cusotmization
self.search_kwargs["customized_search_type"] = self.customized_search_type
if self.search_type == "similarity":
docs = await self.vectorstore.asimilarity_search(
query,
**self.search_kwargs,
)
elif self.search_type == "similarity_score_threshold":
docs_and_similarities = (
await self.vectorstore.asimilarity_search_with_relevance_scores(
query, **self.search_kwargs
)
)
docs = [doc for doc, _ in docs_and_similarities]
elif self.search_type == "mmr":
docs = await self.vectorstore.amax_marginal_relevance_search(
query, **self.search_kwargs
)
elif self.search_type == "hybrid":
docs = []
pass
elif self.search_type == "bm25":
docs = []
else:
raise ValueError(f"search_type of {self.search_type} not allowed.")
return docs
def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
"""Add documents to vectorstore."""
return self.vectorstore.add_documents(documents, **kwargs)
async def aadd_documents(
self, documents: List[Document], **kwargs: Any
) -> List[str]:
"""Add documents to vectorstore."""
return await self.vectorstore.aadd_documents(documents, **kwargs)
class CustomOpenSearchVectorSearch(OpenSearchVectorSearch):
# Source: https://github.com/langchain-ai/langchain/blob/d9396bdec1e27eb253d007a91ce36a78be9aff28/libs/community/langchain_community/vectorstores/opensearch_vector_search.py#L309
def __init__(self, *args, **kwargs):
super().__init__(*args, **kwargs)
def as_retriever(self, **kwargs: Any) -> CustomVectorStoreRetriever:
tags = kwargs.pop("tags", None) or []
tags.extend(self._get_retriever_tags())
self.customized_search_type = kwargs.get("customized_search_type", None)
return CustomVectorStoreRetriever(vectorstore=self, **kwargs, tags=tags)
def similarity_search(
self,
query: str,
k: int = 4,
**kwargs: Any,
) -> List[Document]:
docs_with_scores = self.similarity_search_with_score(query, k, **kwargs)
return [doc[0] for doc in docs_with_scores]
def similarity_search_with_score(
self, query: str, k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
embedding = self.embedding_function.embed_query(query)
kwargs["query"] = query
return self.similarity_search_with_score_by_vector(embedding, k, **kwargs)
def similarity_search_with_score_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[Tuple[Document, float]]:
text_field = kwargs.get("text_field", "text")
metadata_field = kwargs.get("metadata_field", "metadata")
hits = self._raw_similarity_search_with_score_by_vector(
embedding=embedding, k=k, **kwargs
)
documents_with_scores = [
(
Document(
page_content=hit["_source"][text_field],
metadata=(
hit["_source"]
if metadata_field == "*" or metadata_field not in hit["_source"]
else hit["_source"][metadata_field]
),
),
hit["_score"],
)
for hit in hits
]
return documents_with_scores
def _raw_similarity_search_with_score_by_vector(
self, embedding: List[float], k: int = 4, **kwargs: Any
) -> List[dict]:
search_type = kwargs.get("search_type", "approximate_search")
vector_field = kwargs.get("vector_field", "vector_field")
text_field = kwargs.get("text_field", "text")
query = kwargs.get("query", None)
# to make sure query is correct
customized_search_type = kwargs.pop("customized_search_type", None)
print("text_field")
print(text_field)
index_name = kwargs.get("index_name", self.index_name)
filter = kwargs.get("filter", {})
if (
self.is_aoss
and search_type != "approximate_search"
and search_type != SCRIPT_SCORING_SEARCH
):
raise ValueError(
"Amazon OpenSearch Service Serverless only "
"supports `approximate_search` and `script_scoring`"
)
if search_type == "approximate_search":
boolean_filter = kwargs.get("boolean_filter", {})
subquery_clause = kwargs.get("subquery_clause", "must")
efficient_filter = kwargs.get("efficient_filter", {})
# `lucene_filter` is deprecated, added for Backwards Compatibility
lucene_filter = kwargs.get("lucene_filter", {})
if boolean_filter != {} and efficient_filter != {}:
raise ValueError(
"Both `boolean_filter` and `efficient_filter` are provided which "
"is invalid"
)
if lucene_filter != {} and efficient_filter != {}:
raise ValueError(
"Both `lucene_filter` and `efficient_filter` are provided which "
"is invalid. `lucene_filter` is deprecated"
)
if lucene_filter != {} and boolean_filter != {}:
raise ValueError(
"Both `lucene_filter` and `boolean_filter` are provided which "
"is invalid. `lucene_filter` is deprecated"
)
if (
efficient_filter == {}
and boolean_filter == {}
and lucene_filter == {}
and filter != {}
):
if self.engine in ["faiss", "lucene"]:
efficient_filter = filter
else:
boolean_filter = filter
if boolean_filter != {}:
search_query = _approximate_search_query_with_boolean_filter(
embedding,
boolean_filter,
k=k,
vector_field=vector_field,
subquery_clause=subquery_clause,
)
elif efficient_filter != {}:
search_query = _approximate_search_query_with_efficient_filter(
embedding, efficient_filter, k=k, vector_field=vector_field
)
elif lucene_filter != {}:
warnings.warn(
"`lucene_filter` is deprecated. Please use the keyword argument"
" `efficient_filter`"
)
search_query = _approximate_search_query_with_efficient_filter(
embedding, lucene_filter, k=k, vector_field=vector_field
)
else:
search_query = _default_approximate_search_query(
embedding, k=k, vector_field=vector_field
)
elif search_type == SCRIPT_SCORING_SEARCH:
space_type = kwargs.get("space_type", "l2")
pre_filter = kwargs.get("pre_filter", MATCH_ALL_QUERY)
search_query = _default_script_query(
embedding, k, space_type, pre_filter, vector_field
)
elif search_type == PAINLESS_SCRIPTING_SEARCH:
space_type = kwargs.get("space_type", "l2Squared")
pre_filter = kwargs.get("pre_filter", MATCH_ALL_QUERY)
search_query = _default_painless_scripting_query(
embedding, k, space_type, pre_filter, vector_field
)
else:
raise ValueError("Invalid `search_type` provided as an argument")
if customized_search_type == "lexical":
search_query = self._make_search_query_lexical(query, text_field, search_query)
elif customized_search_type == "hybrid":
search_query = self._make_search_query_hybrid(query, text_field, search_query)
print("final search_query")
print(search_query)
response = self.client.search(index=index_name, body=search_query)
return [hit for hit in response["hits"]["hits"]]
def _make_search_query_lexical(
self, query: str, text_field: str, search_query: Dict
) -> Dict:
result = search_query
new_must = search_query["query"]["bool"]["must"] = [
{
"match": {
text_field: {
"query": query,
}
}
}
]
result["query"]["bool"]["must"] = new_must
return result
def _make_search_query_hybrid(
self, query: str, text_field: str, search_query: Dict
) -> Dict:
result = search_query
new_must = search_query["query"]["bool"]["must"]
new_must.append(
{
"match": {
text_field: {
"query": query,
}
}
}
)
result["query"]["bool"]["must"] = new_must
return result
I managed to get lexical and hybrid query to work with this hack, someone can build proper opensearchvector retriever with this.
Ooh great job, good PR inthere for someone
I've put together an implementation for this that should work, and have a PR submitted for review #20092