langchain icon indicating copy to clipboard operation
langchain copied to clipboard

Implementing Hybrid Search (RRF) in LangChain Using OpenSearch Vector Store

Open younes-io opened this issue 1 year ago • 4 comments

Feature request

The feature request I am proposing involves the implementation of hybrid search, specifically using the Reciprocal Rank Fusion (RRF) method, in LangChain through the integration of OpenSearch's vector store. This would enable the combination of keyword and similarity search. Currently, LangChain doesn't appear to support this functionality, even though OpenSearch has had this capability since its 2.10 release. The goal is to allow LangChain to call search pipelines using OpenSearch's vector implementation, enabling OpenSearch to handle the complexities of hybrid search.

Relevant Links: https://opensearch.org/docs/latest/query-dsl/compound/hybrid

Motivation

The motivation behind this request stems from the current limitation in LangChain regarding hybrid search capabilities. As someone working on a search project currently, I find it frustrating that despite OpenSearch supporting hybrid search since version 2.10, LangChain has not yet integrated this feature.

Your contribution

I would gladly help as long as I get guidance..

younes-io avatar Nov 19 '23 13:11 younes-io

🤖

Hi @younes-io,

Great to see you again! I hope your project is coming along well. I'm currently reviewing your new issue about implementing hybrid search in LangChain with OpenSearch's vector store. I'll get back to you with a more comprehensive response soon. Thanks for your patience!

dosubot[bot] avatar Nov 19 '23 13:11 dosubot[bot]

🤖

Hello again @younes-io! It's good to see you back and thanks for bringing up another interesting feature request.

Thank you for your feature request. I understand that you're looking to implement the Reciprocal Rank Fusion (RRF) method for hybrid search in LangChain using OpenSearch's vector store. This is indeed a valuable feature that could enhance the capabilities of LangChain.

To implement this, you would need to create a new retriever that uses OpenSearch's vector store for retrieving documents. This new retriever would then be used as one of the retrievers in the EnsembleRetriever. Here's a simplified example of how you might create a new retriever for OpenSearch:

from langchain.schema import BaseRetriever, Document

class OpenSearchRetriever(BaseRetriever):
    """Retriever that uses OpenSearch's vector store for retrieving documents."""

    def _get_relevant_documents(
        self,
        query: str,
        *,
        run_manager: CallbackManagerForRetrieverRun,
    ) -> List[Document]:
        """
        Get the relevant documents for a given query using OpenSearch's vector store.

        Args:
            query: The query to search for.

        Returns:
            A list of relevant documents.
        """
        # Use OpenSearch's vector store to get relevant documents
        # This is a placeholder and should be replaced with actual code
        documents = opensearch_vector_store_search(query)

        return documents

Once you have the new retriever, you can use it in the EnsembleRetriever like this:

ensemble_retriever = EnsembleRetriever(
    retrievers=[opensearch_retriever, other_retriever],
    weights=[0.5, 0.5],
)

This will use the RRF method to combine the results from the OpenSearch retriever and the other retriever.

Please note that this is a simplified example and the actual implementation may require additional steps, such as handling asynchronous operations and integrating with the rest of the LangChain framework.

As for your question about the opensearch-py function for searching in a pipeline, I'm afraid I don't have the information at the moment. However, you might want to check the OpenSearch documentation or their community for more details.

I hope this helps! If you have any further questions or need more guidance, feel free to ask.

Sources


This response is meant to be useful and save you time. It is not meant to be a precise solution, but rather a starting point for your own research.

Help me be more useful! Please leave a 👍 if this is helpful and 👎 if it is irrelevant.

If you want to continue the conversation, start your reply with @dosu-bot.

dosubot[bot] avatar Nov 19 '23 14:11 dosubot[bot]

Hi @younes-io may i know if this feature is available now for hybrid search in langchain?

ramda1234786 avatar Jan 24 '24 03:01 ramda1234786

+1 from me, it's a shame this exists on the elasticsearch store but not opensearch!

AndreasThinks avatar Jan 25 '24 14:01 AndreasThinks

closing as langchain doesnt seem to care x)

younes-io avatar Mar 09 '24 18:03 younes-io

So ironically, I've since discovered this actually works if you just copy the ElasticSearch implementation in OpenSearch.... see example below. @younes-io

    opensearch_hybrid_retriever = vectorstore.as_retriever(
        k='5',
        strategy=ElasticsearchStore.ApproxRetrievalStrategy(
        hybrid=True))

AndreasThinks avatar Mar 11 '24 12:03 AndreasThinks

closing as langchain doesnt seem to care x)

Why are you closing this if this is not fixed? Could you please keep it open and let the contributors pick it up!

ksarang90 avatar Mar 12 '24 11:03 ksarang90

Do people have thoughts around if the ElasticSearhc import above works, and if so, should we add to docs? Happy to write the PR, but aware that it's slightly messy to import an ElasticSearch method into the Opensearch vectorstore.

AndreasThinks avatar Mar 12 '24 13:03 AndreasThinks

@AndreasThinks : Thanks for the tip! I ended up doing similarity only. BTW, I'm not sure if hybrid search will work even if LangChain implements it. From what I read in OpenSearch docs a few months ago, you need to create a pipeline and configure it specifically to do hybrid search.. it's not systematic. @ksarang90 : I don't think it's useful to open issues, and, for months, no feedback from contributors. I have seen many issues on LangChain like this.. People keep asking questions and it stays there..

younes-io avatar Mar 12 '24 14:03 younes-io

So ironically, I've since discovered this actually works if you just copy the ElasticSearch implementation in OpenSearch.... see example below. @younes-io

    opensearch_hybrid_retriever = vectorstore.as_retriever(
        k='5',
        strategy=ElasticsearchStore.ApproxRetrievalStrategy(
        hybrid=True))

opensearch vectorstore does not have strategy field, https://github.com/langchain-ai/langchain/blob/master/libs/community/langchain_community/vectorstores/opensearch_vector_search.py#L309, like elasticsearch vectorstore, https://github.com/langchain-ai/langchain/blob/7d216ad1e1fd26ef66e0a037b011ea52f3aadc78/libs/community/langchain_community/vectorstores/elasticsearch.py#L529. This will not work, even though code is not breaking.

jaavedd9 avatar Mar 20 '24 15:03 jaavedd9

class CustomVectorStoreRetriever(VectorStoreRetriever):
    customized_search_type: Optional[str]

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)
        self.customized_search_type = kwargs.get("customized_search_type", None)

    allowed_search_types: ClassVar[Collection[str]] = (
        "similarity",
        "similarity_score_threshold",
        "mmr",
        # new additions in customizations
        "hybrid",
        "bm25",
    )

    @root_validator()
    def validate_search_type(cls, values: Dict) -> Dict:
        """Validate search type."""
        search_type = values["search_type"]
        if search_type not in cls.allowed_search_types:
            raise ValueError(
                f"search_type of {search_type} not allowed. Valid values are: "
                f"{cls.allowed_search_types}"
            )
        if search_type == "similarity_score_threshold":
            score_threshold = values["search_kwargs"].get("score_threshold")
            if (score_threshold is None) or (not isinstance(score_threshold, float)):
                raise ValueError(
                    "`score_threshold` is not specified with a float value(0~1) "
                    "in `search_kwargs`."
                )
        return values

    def _get_relevant_documents(
        self, query: str, *, run_manager: CallbackManagerForRetrieverRun
    ) -> List[Document]:
        if self.search_type == "similarity":
            docs = self.vectorstore.similarity_search(query, **self.search_kwargs)
        elif self.search_type == "similarity_score_threshold":
            docs_and_similarities = (
                self.vectorstore.similarity_search_with_relevance_scores(
                    query, **self.search_kwargs
                )
            )
            docs = [doc for doc, _ in docs_and_similarities]
        elif self.search_type == "mmr":
            docs = self.vectorstore.max_marginal_relevance_search(
                query, **self.search_kwargs
            )
        elif self.search_type == "hybrid":
            docs = []
        elif self.search_type == "bm25":
            docs = []
        else:
            raise ValueError(f"search_type of {self.search_type} not allowed.")
        return docs

    async def _aget_relevant_documents(
        self, query: str, *, run_manager: AsyncCallbackManagerForRetrieverRun
    ) -> List[Document]:
        print("search kwargs")
        print(self.search_kwargs)
        if self.customized_search_type:
            # cusotmization
            self.search_kwargs["customized_search_type"] = self.customized_search_type
        if self.search_type == "similarity":
            docs = await self.vectorstore.asimilarity_search(
                query,
                **self.search_kwargs,
            )
        elif self.search_type == "similarity_score_threshold":
            docs_and_similarities = (
                await self.vectorstore.asimilarity_search_with_relevance_scores(
                    query, **self.search_kwargs
                )
            )
            docs = [doc for doc, _ in docs_and_similarities]
        elif self.search_type == "mmr":
            docs = await self.vectorstore.amax_marginal_relevance_search(
                query, **self.search_kwargs
            )
        elif self.search_type == "hybrid":
            docs = []
            pass
        elif self.search_type == "bm25":
            docs = []
        else:
            raise ValueError(f"search_type of {self.search_type} not allowed.")
        return docs

    def add_documents(self, documents: List[Document], **kwargs: Any) -> List[str]:
        """Add documents to vectorstore."""
        return self.vectorstore.add_documents(documents, **kwargs)

    async def aadd_documents(
        self, documents: List[Document], **kwargs: Any
    ) -> List[str]:
        """Add documents to vectorstore."""
        return await self.vectorstore.aadd_documents(documents, **kwargs)



class CustomOpenSearchVectorSearch(OpenSearchVectorSearch):
    # Source: https://github.com/langchain-ai/langchain/blob/d9396bdec1e27eb253d007a91ce36a78be9aff28/libs/community/langchain_community/vectorstores/opensearch_vector_search.py#L309

    def __init__(self, *args, **kwargs):
        super().__init__(*args, **kwargs)

    def as_retriever(self, **kwargs: Any) -> CustomVectorStoreRetriever:
        tags = kwargs.pop("tags", None) or []
        tags.extend(self._get_retriever_tags())
        self.customized_search_type = kwargs.get("customized_search_type", None)
        return CustomVectorStoreRetriever(vectorstore=self, **kwargs, tags=tags)

    def similarity_search(
        self,
        query: str,
        k: int = 4,
        **kwargs: Any,
    ) -> List[Document]:
        docs_with_scores = self.similarity_search_with_score(query, k, **kwargs)
        return [doc[0] for doc in docs_with_scores]

    def similarity_search_with_score(
        self, query: str, k: int = 4, **kwargs: Any
    ) -> List[Tuple[Document, float]]:
        embedding = self.embedding_function.embed_query(query)
        kwargs["query"] = query
        return self.similarity_search_with_score_by_vector(embedding, k, **kwargs)

    def similarity_search_with_score_by_vector(
        self, embedding: List[float], k: int = 4, **kwargs: Any
    ) -> List[Tuple[Document, float]]:
        text_field = kwargs.get("text_field", "text")
        metadata_field = kwargs.get("metadata_field", "metadata")

        hits = self._raw_similarity_search_with_score_by_vector(
            embedding=embedding, k=k, **kwargs
        )

        documents_with_scores = [
            (
                Document(
                    page_content=hit["_source"][text_field],
                    metadata=(
                        hit["_source"]
                        if metadata_field == "*" or metadata_field not in hit["_source"]
                        else hit["_source"][metadata_field]
                    ),
                ),
                hit["_score"],
            )
            for hit in hits
        ]
        return documents_with_scores

    def _raw_similarity_search_with_score_by_vector(
        self, embedding: List[float], k: int = 4, **kwargs: Any
    ) -> List[dict]:
        search_type = kwargs.get("search_type", "approximate_search")
        vector_field = kwargs.get("vector_field", "vector_field")
        text_field = kwargs.get("text_field", "text")
        query = kwargs.get("query", None)
        # to make sure query is correct
        customized_search_type = kwargs.pop("customized_search_type", None)
        print("text_field")
        print(text_field)
        index_name = kwargs.get("index_name", self.index_name)
        filter = kwargs.get("filter", {})

        if (
            self.is_aoss
            and search_type != "approximate_search"
            and search_type != SCRIPT_SCORING_SEARCH
        ):
            raise ValueError(
                "Amazon OpenSearch Service Serverless only "
                "supports `approximate_search` and `script_scoring`"
            )

        if search_type == "approximate_search":
            boolean_filter = kwargs.get("boolean_filter", {})
            subquery_clause = kwargs.get("subquery_clause", "must")
            efficient_filter = kwargs.get("efficient_filter", {})
            # `lucene_filter` is deprecated, added for Backwards Compatibility
            lucene_filter = kwargs.get("lucene_filter", {})

            if boolean_filter != {} and efficient_filter != {}:
                raise ValueError(
                    "Both `boolean_filter` and `efficient_filter` are provided which "
                    "is invalid"
                )

            if lucene_filter != {} and efficient_filter != {}:
                raise ValueError(
                    "Both `lucene_filter` and `efficient_filter` are provided which "
                    "is invalid. `lucene_filter` is deprecated"
                )

            if lucene_filter != {} and boolean_filter != {}:
                raise ValueError(
                    "Both `lucene_filter` and `boolean_filter` are provided which "
                    "is invalid. `lucene_filter` is deprecated"
                )

            if (
                efficient_filter == {}
                and boolean_filter == {}
                and lucene_filter == {}
                and filter != {}
            ):
                if self.engine in ["faiss", "lucene"]:
                    efficient_filter = filter
                else:
                    boolean_filter = filter

            if boolean_filter != {}:
                search_query = _approximate_search_query_with_boolean_filter(
                    embedding,
                    boolean_filter,
                    k=k,
                    vector_field=vector_field,
                    subquery_clause=subquery_clause,
                )
            elif efficient_filter != {}:
                search_query = _approximate_search_query_with_efficient_filter(
                    embedding, efficient_filter, k=k, vector_field=vector_field
                )
            elif lucene_filter != {}:
                warnings.warn(
                    "`lucene_filter` is deprecated. Please use the keyword argument"
                    " `efficient_filter`"
                )
                search_query = _approximate_search_query_with_efficient_filter(
                    embedding, lucene_filter, k=k, vector_field=vector_field
                )
            else:
                search_query = _default_approximate_search_query(
                    embedding, k=k, vector_field=vector_field
                )
        elif search_type == SCRIPT_SCORING_SEARCH:
            space_type = kwargs.get("space_type", "l2")
            pre_filter = kwargs.get("pre_filter", MATCH_ALL_QUERY)
            search_query = _default_script_query(
                embedding, k, space_type, pre_filter, vector_field
            )
        elif search_type == PAINLESS_SCRIPTING_SEARCH:
            space_type = kwargs.get("space_type", "l2Squared")
            pre_filter = kwargs.get("pre_filter", MATCH_ALL_QUERY)
            search_query = _default_painless_scripting_query(
                embedding, k, space_type, pre_filter, vector_field
            )
        else:
            raise ValueError("Invalid `search_type` provided as an argument")

        if customized_search_type == "lexical":
            search_query = self._make_search_query_lexical(query, text_field, search_query)
        elif customized_search_type == "hybrid":
            search_query = self._make_search_query_hybrid(query, text_field, search_query)
        print("final search_query")
        print(search_query)
        response = self.client.search(index=index_name, body=search_query)
        return [hit for hit in response["hits"]["hits"]]

    def _make_search_query_lexical(
        self, query: str, text_field: str, search_query: Dict
    ) -> Dict:
        result = search_query
        new_must = search_query["query"]["bool"]["must"] = [
            {
                "match": {
                    text_field: {
                        "query": query,
                    }
                }
            }
        ]
        result["query"]["bool"]["must"] = new_must
        return result

    def _make_search_query_hybrid(
        self, query: str, text_field: str, search_query: Dict
    ) -> Dict:
        result = search_query
        new_must = search_query["query"]["bool"]["must"]
        new_must.append(
            {
                "match": {
                    text_field: {
                        "query": query,
                    }
                }
            }
        )
        result["query"]["bool"]["must"] = new_must
        return result

I managed to get lexical and hybrid query to work with this hack, someone can build proper opensearchvector retriever with this.

jaavedd9 avatar Mar 20 '24 19:03 jaavedd9

Ooh great job, good PR inthere for someone

AndreasThinks avatar Mar 21 '24 12:03 AndreasThinks

I've put together an implementation for this that should work, and have a PR submitted for review #20092

AndreasThinks avatar Apr 05 '24 23:04 AndreasThinks