phidata icon indicating copy to clipboard operation
phidata copied to clipboard

Error loading documents into PDFKnowledgeBase, closed context.

Open coljac opened this issue 2 months ago • 1 comments

Attempting to import a large PDF into a KB, the following happened. I would expect a more graceful failure or some more information.

Phidata version: 2.6.5 sqlalchemy version: 2.0.36 Backend: Postgresql (docker) The KB:

history_knowledge = PDFKnowledgeBase(
    path="data",
    vector_db=PgVector(
        table_name="history_documents",
        db_url="postgresql+psycopg://phi_user:phi_password@localhost:5432/phi_db",
    ),
    reader=PDFReader(chunk=True),
)

The error:

INFO     Inserted batch of 100 documents.                                                                                                                                     

ERROR    Error with batch 100: Can't operate on closed transaction inside context manager.  Please complete the context manager before emitting further commands.             
ERROR    Error inserting documents: Can't operate on closed transaction inside context manager.  Please complete the context manager before emitting further commands.        
Traceback (most recent call last):
  File "/home/coljac/src/scuttlebutt/codeteam.py", line 7, in <module>
    from knowledge_base import godot_knowledge, doc_knowledge, history_knowledge
  File "/home/coljac/src/scuttlebutt/knowledge_base.py", line 44, in <module>
    history_knowledge.load(recreate=True)
  File "/home/coljac/src/scuttlebutt/.venv/lib/python3.12/site-packages/phi/knowledge/agent.py", line 97, in load
    self.vector_db.insert(documents=documents_to_load, filters=filters)
  File "/home/coljac/src/scuttlebutt/.venv/lib/python3.12/site-packages/phi/vectordb/pgvector/pgvector.py", line 314, in insert
    sess.execute(insert_stmt, batch_records)
  File "/home/coljac/src/scuttlebutt/.venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2362, in execute
    return self._execute_internal(
           ^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/coljac/src/scuttlebutt/.venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2237, in _execute_internal
    conn = self._connection_for_bind(bind)
           ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/home/coljac/src/scuttlebutt/.venv/lib/python3.12/site-packages/sqlalchemy/orm/session.py", line 2101, in _connection_for_bind
    TransactionalContext._trans_ctx_check(self)
  File "/home/coljac/src/scuttlebutt/.venv/lib/python3.12/site-packages/sqlalchemy/engine/util.py", line 111, in _trans_ctx_check
    raise exc.InvalidRequestError(
sqlalchemy.exc.InvalidRequestError: Can't operate on closed transaction inside context manager.  Please complete the context manager before emitting further commands.

I experimented with the following change to the vectordb class, which worked, putting the session context manager inside the batch loop:

def insert(self, documents: List[Document], filters: Optional[Dict[str, Any]] = None, batch_size: int = 100) -> None:
    try:
        for i in range(0, len(documents), batch_size):
            batch_docs = documents[i : i + batch_size]
            logger.debug(f"Processing batch starting at index {i}, size: {len(batch_docs)}")
            try:
                # Create new session for each batch
                with self.Session() as sess:
                    batch_records = []
                    for doc in batch_docs:
                        try:
                            doc.embed(embedder=self.embedder)
                            cleaned_content = self._clean_content(doc.content)
                            content_hash = md5(cleaned_content.encode()).hexdigest()
                            _id = doc.id or content_hash
                            record = {
                                "id": _id,
                                "name": doc.name,
                                "meta_data": doc.meta_data,
                                "filters": filters,
                                "content": cleaned_content,
                                "embedding": doc.embedding,
                                "usage": doc.usage,
                                "content_hash": content_hash,
                            }
                            batch_records.append(record)
                        except Exception as e:
                            logger.error(f"Error processing document '{doc.name}': {e}")

                    # Insert the batch of records
                    insert_stmt = postgresql.insert(self.table)
                    sess.execute(insert_stmt, batch_records)
                    sess.commit()
                    logger.info(f"Inserted batch of {len(batch_records)} documents.")
            except Exception as e:
                logger.error(f"Error with batch starting at index {i}: {e}")
                raise
    except Exception as e:
        logger.error(f"Error inserting documents: {e}")
        raise

I can make a PR if this seems right.

coljac avatar Dec 10 '24 05:12 coljac