agno icon indicating copy to clipboard operation
agno copied to clipboard

[Question] Enabling Independent LLM Responses When Knowledge Base is Irrelevant

Open ozbekburak opened this issue 1 year ago • 3 comments

Hello,

Thank you for maintaining this great project. I’m developing an Agentic RAG application using the DocumentKnowledgeBase with PGVector.

In cases where the knowledge base contains no relevant information for a given prompt, I’d like the LLM to generate answers independently without relying on retrieved documents. While this can be partially addressed through prompting, is there a programmatic way to control this behavior—for example, by specifying a similarity threshold or distance value for retrieved documents?

Specifically:

Could we introduce a threshold parameter (e.g., min_similarity or max_distance) to determine whether to use the knowledge base or bypass it entirely?

If such a parameter already exists, could you clarify how to configure it?

This would allow dynamic switching between knowledge-grounded and independent LLM responses based on retrieval relevance.

ozbekburak avatar Feb 15 '25 07:02 ozbekburak

Hey @ozbekburak! We do have a reranker, but I totally understand that it doesn’t offer the level of granularity you're looking for. I really love your suggestion, though! I’ve added it as a feature request, and I’m excited to get it shipped. We’ll keep you in the loop and let you know as we make progress on this. Thanks so much for your valuable input!

manthanguptaa avatar Feb 17 '25 10:02 manthanguptaa

I needed something similar, and I ended up writing a custom PgVector implementation that gave me this information. My code is super ugly, but I'll share it here in case it helps provide some inspiration:

"""
Custom PgVector implementation that preserves hybrid scores in document metadata.
"""

from typing import List, Optional, Dict, Any, Union
import logging
from sqlalchemy import select, func, desc, text
from agno.vectordb.pgvector import PgVector, SearchType
from agno.document import Document

logger = logging.getLogger(__name__)


class ScoredPgVector(PgVector):
    """
    Extension of PgVector that preserves hybrid scores in document metadata.
    """

    async def hybrid_search(
        self,
        query: str,
        limit: int = 3,
        filter_metadata: Optional[Dict[str, Any]] = None,
        **kwargs,
    ) -> List[Document]:
        """
        Perform a hybrid search and preserve scores in document metadata.

        Args:
            query: The search query
            limit: Maximum number of results to return
            filter_metadata: Optional metadata filters
            **kwargs: Additional arguments

        Returns:
            List of Document objects with hybrid scores in metadata
        """
        # No query, return empty results
        if not query or query.strip() == "":
            return []

        # Ensure the table exists
        if not self.exists():
            logger.warning(f"Table {self.table_name} does not exist")
            return []

        try:
            # Generate embeddings
            embeddings = self.embedder.get_embedding(query)

            # Create a semantic search query with scores
            semantic_subquery = (
                select(
                    self.table.c.id,
                    self.table.c.embedding.cosine_distance(embeddings).label(
                        "semantic_score"
                    ),
                )
                .limit(limit * 3)  # Retrieve more for hybrid search
                .alias("semantic")
            )

            # Create a text search query with to_tsvector
            language = getattr(self, "content_language", "english")
            ts_vector = func.to_tsvector(language, self.table.c.content)
            ts_query = func.plainto_tsquery(language, query)

            keyword_subquery = (
                select(
                    self.table.c.id,
                    func.ts_rank_cd(ts_vector, ts_query).label("keyword_score"),
                )
                .where(ts_vector.op("@@")(ts_query))
                .limit(limit * 3)  # Retrieve more for hybrid search
                .alias("keyword")
            )

            # Combine for hybrid search
            stmt = (
                select(
                    self.table,
                    semantic_subquery.c.semantic_score,
                    keyword_subquery.c.keyword_score,
                    (
                        0.7 * (1 - semantic_subquery.c.semantic_score)
                        + 0.3 * func.coalesce(keyword_subquery.c.keyword_score, 0)
                    ).label("hybrid_score"),
                )
                .join(
                    semantic_subquery,
                    self.table.c.id == semantic_subquery.c.id,
                )
                .join(
                    keyword_subquery,
                    self.table.c.id == keyword_subquery.c.id,
                    isouter=True,
                )
                .order_by(desc("hybrid_score"))
                .limit(limit)
            )

            # Add metadata filters if provided
            if filter_metadata and isinstance(filter_metadata, dict):
                for key, value in filter_metadata.items():
                    # Handle multiple values for the same key
                    if isinstance(value, list):
                        conditions = []
                        for v in value:
                            conditions.append(
                                self.table.c.meta_data[key].as_string() == str(v)
                            )
                        stmt = stmt.where(func.or_(*conditions))
                    else:
                        stmt = stmt.where(
                            self.table.c.meta_data[key].as_string() == str(value)
                        )

            # Execute the query
            try:
                # Get a session
                with self.Session() as sess, sess.begin():
                    # Apply index settings if needed
                    if hasattr(self, "vector_index") and self.vector_index is not None:
                        if hasattr(self.vector_index, "probes"):  # For Ivfflat
                            sess.execute(
                                text(
                                    f"SET LOCAL ivfflat.probes = {self.vector_index.probes}"
                                )
                            )
                        elif hasattr(self.vector_index, "ef_search"):  # For HNSW
                            sess.execute(
                                text(
                                    f"SET LOCAL hnsw.ef_search = {self.vector_index.ef_search}"
                                )
                            )

                    # Execute the query
                    results = sess.execute(stmt).fetchall()
            except Exception as e:
                logger.error(f"Error performing hybrid search: {e}")
                return []

            # Process the results and convert to Document objects
            search_results: List[Document] = []
            for result in results:
                # Create document with base fields
                doc = Document(
                    id=result.id,
                    name=result.name,
                    meta_data=result.meta_data,
                    content=result.content,
                    embedder=self.embedder,
                    embedding=result.embedding,
                    usage=result.usage,
                )

                # Add hybrid score to metadata
                hybrid_score = result.hybrid_score
                if not hasattr(doc, "metadata") or doc.metadata is None:
                    doc.metadata = {}
                doc.metadata["hybrid_score"] = hybrid_score

                search_results.append(doc)

            return search_results
        except Exception as e:
            logger.error(f"Error during hybrid search: {e}", exc_info=True)
            return []

    async def search(
        self,
        query: str,
        limit: int = 3,
        filter_metadata: Optional[Dict[str, Any]] = None,
        search_type: Optional[Union[SearchType, str]] = None,
        **kwargs,
    ) -> List[Document]:
        """
        Search for documents using the specified search type and preserve scores.

        Args:
            query: The search query
            limit: Maximum number of results to return
            filter_metadata: Optional metadata filters
            search_type: The search type to use (semantic, keyword, or hybrid)
            **kwargs: Additional arguments

        Returns:
            List of Document objects with scores in metadata
        """
        # Convert string to SearchType if needed
        if isinstance(search_type, str):
            try:
                search_type = SearchType[search_type.upper()]
            except KeyError:
                logger.warning(
                    f"Unknown search type: {search_type}. Using default (hybrid)."
                )
                search_type = SearchType.hybrid

        # Default to hybrid search if not specified
        if search_type is None:
            search_type = SearchType.hybrid

        # Use the appropriate search method based on search_type
        if search_type == SearchType.hybrid:
            return await self.hybrid_search(query, limit, filter_metadata, **kwargs)
        else:
            # For other search types, use the parent class method
            return await super().search(
                query, limit, filter_metadata, search_type, **kwargs
            )

jesalg avatar Apr 01 '25 04:04 jesalg

I needed something similar, and I ended up writing a custom PgVector implementation that gave me this information. My code is super ugly, but I'll share it here in case it helps provide some inspiration:

""" Custom PgVector implementation that preserves hybrid scores in document metadata. """

from typing import List, Optional, Dict, Any, Union import logging from sqlalchemy import select, func, desc, text from agno.vectordb.pgvector import PgVector, SearchType from agno.document import Document

logger = logging.getLogger(name)

class ScoredPgVector(PgVector): """ Extension of PgVector that preserves hybrid scores in document metadata. """

async def hybrid_search(
    self,
    query: str,
    limit: int = 3,
    filter_metadata: Optional[Dict[str, Any]] = None,
    **kwargs,
) -> List[Document]:
    """
    Perform a hybrid search and preserve scores in document metadata.

    Args:
        query: The search query
        limit: Maximum number of results to return
        filter_metadata: Optional metadata filters
        **kwargs: Additional arguments

    Returns:
        List of Document objects with hybrid scores in metadata
    """
    # No query, return empty results
    if not query or query.strip() == "":
        return []

    # Ensure the table exists
    if not self.exists():
        logger.warning(f"Table {self.table_name} does not exist")
        return []

    try:
        # Generate embeddings
        embeddings = self.embedder.get_embedding(query)

        # Create a semantic search query with scores
        semantic_subquery = (
            select(
                self.table.c.id,
                self.table.c.embedding.cosine_distance(embeddings).label(
                    "semantic_score"
                ),
            )
            .limit(limit * 3)  # Retrieve more for hybrid search
            .alias("semantic")
        )

        # Create a text search query with to_tsvector
        language = getattr(self, "content_language", "english")
        ts_vector = func.to_tsvector(language, self.table.c.content)
        ts_query = func.plainto_tsquery(language, query)

        keyword_subquery = (
            select(
                self.table.c.id,
                func.ts_rank_cd(ts_vector, ts_query).label("keyword_score"),
            )
            .where(ts_vector.op("@@")(ts_query))
            .limit(limit * 3)  # Retrieve more for hybrid search
            .alias("keyword")
        )

        # Combine for hybrid search
        stmt = (
            select(
                self.table,
                semantic_subquery.c.semantic_score,
                keyword_subquery.c.keyword_score,
                (
                    0.7 * (1 - semantic_subquery.c.semantic_score)
                    + 0.3 * func.coalesce(keyword_subquery.c.keyword_score, 0)
                ).label("hybrid_score"),
            )
            .join(
                semantic_subquery,
                self.table.c.id == semantic_subquery.c.id,
            )
            .join(
                keyword_subquery,
                self.table.c.id == keyword_subquery.c.id,
                isouter=True,
            )
            .order_by(desc("hybrid_score"))
            .limit(limit)
        )

        # Add metadata filters if provided
        if filter_metadata and isinstance(filter_metadata, dict):
            for key, value in filter_metadata.items():
                # Handle multiple values for the same key
                if isinstance(value, list):
                    conditions = []
                    for v in value:
                        conditions.append(
                            self.table.c.meta_data[key].as_string() == str(v)
                        )
                    stmt = stmt.where(func.or_(*conditions))
                else:
                    stmt = stmt.where(
                        self.table.c.meta_data[key].as_string() == str(value)
                    )

        # Execute the query
        try:
            # Get a session
            with self.Session() as sess, sess.begin():
                # Apply index settings if needed
                if hasattr(self, "vector_index") and self.vector_index is not None:
                    if hasattr(self.vector_index, "probes"):  # For Ivfflat
                        sess.execute(
                            text(
                                f"SET LOCAL ivfflat.probes = {self.vector_index.probes}"
                            )
                        )
                    elif hasattr(self.vector_index, "ef_search"):  # For HNSW
                        sess.execute(
                            text(
                                f"SET LOCAL hnsw.ef_search = {self.vector_index.ef_search}"
                            )
                        )

                # Execute the query
                results = sess.execute(stmt).fetchall()
        except Exception as e:
            logger.error(f"Error performing hybrid search: {e}")
            return []

        # Process the results and convert to Document objects
        search_results: List[Document] = []
        for result in results:
            # Create document with base fields
            doc = Document(
                id=result.id,
                name=result.name,
                meta_data=result.meta_data,
                content=result.content,
                embedder=self.embedder,
                embedding=result.embedding,
                usage=result.usage,
            )

            # Add hybrid score to metadata
            hybrid_score = result.hybrid_score
            if not hasattr(doc, "metadata") or doc.metadata is None:
                doc.metadata = {}
            doc.metadata["hybrid_score"] = hybrid_score

            search_results.append(doc)

        return search_results
    except Exception as e:
        logger.error(f"Error during hybrid search: {e}", exc_info=True)
        return []

async def search(
    self,
    query: str,
    limit: int = 3,
    filter_metadata: Optional[Dict[str, Any]] = None,
    search_type: Optional[Union[SearchType, str]] = None,
    **kwargs,
) -> List[Document]:
    """
    Search for documents using the specified search type and preserve scores.

    Args:
        query: The search query
        limit: Maximum number of results to return
        filter_metadata: Optional metadata filters
        search_type: The search type to use (semantic, keyword, or hybrid)
        **kwargs: Additional arguments

    Returns:
        List of Document objects with scores in metadata
    """
    # Convert string to SearchType if needed
    if isinstance(search_type, str):
        try:
            search_type = SearchType[search_type.upper()]
        except KeyError:
            logger.warning(
                f"Unknown search type: {search_type}. Using default (hybrid)."
            )
            search_type = SearchType.hybrid

    # Default to hybrid search if not specified
    if search_type is None:
        search_type = SearchType.hybrid

    # Use the appropriate search method based on search_type
    if search_type == SearchType.hybrid:
        return await self.hybrid_search(query, limit, filter_metadata, **kwargs)
    else:
        # For other search types, use the parent class method
        return await super().search(
            query, limit, filter_metadata, search_type, **kwargs
        )

I'll give it a shot! Thanks.

ozbekburak avatar Apr 02 '25 07:04 ozbekburak

Hey @ozbekburak Thank you so much for using Agno and for reaching out to us.

We've added your suggestion to our community wishlist. Since Agno is open-source, you're more than welcome to explore or even take a shot at building it yourself.If you decide to dive in and need any guidance along the way, we’d be more than happy to help.

monali7-d avatar Apr 23 '25 11:04 monali7-d