graphiti icon indicating copy to clipboard operation
graphiti copied to clipboard

Error using non default db

Open javi-horizon opened this issue 9 months ago • 1 comments

using DEFAULT_DATABASE="example"

import json
import os
from datetime import datetime, timezone

from dotenv import load_dotenv
from graphiti_core import Graphiti
from graphiti_core.cross_encoder.openai_reranker_client import OpenAIRerankerClient
from graphiti_core.embedder.openai import OpenAIEmbedder, OpenAIEmbedderConfig
from graphiti_core.llm_client import OpenAIClient
from graphiti_core.nodes import EpisodeType
from openai import AsyncAzureOpenAI

# Azure OpenAI configuration
api_key = os.getenv("AZURE_KEY")
api_version = os.getenv("AZURE_VERSION")
azure_endpoint = os.getenv("AZURE_URL")

# Create Azure OpenAI client for LLM
azure_openai_client = AsyncAzureOpenAI(api_key=api_key, api_version=api_version, azure_endpoint=azure_endpoint)

# Initialize Graphiti with Azure OpenAI clients
graphiti = Graphiti(
    os.getenv("NEO4J_URL"),
    os.getenv("NEO4J_USER"),
    os.getenv("NEO4J_PASSWORD"),
    llm_client=OpenAIClient(client=azure_openai_client),
    embedder=OpenAIEmbedder(
        config=OpenAIEmbedderConfig(
            embedding_model="text-embedding-3-small"  # Use your Azure deployed embedding model name
        ),
        client=azure_openai_client,
    ),
    # Optional: Configure the OpenAI cross encoder with Azure OpenAI
    cross_encoder=OpenAIRerankerClient(client=azure_openai_client),
)

# Initialize Graphiti with Neo4j connection
try:
    # Initialize the graph database with graphiti's indices. This only needs to be done once.
    await graphiti.build_indices_and_constraints()

    # Additional code will go here
    # Episodes list containing both text and JSON episodes
    episodes = [
        {
            "content": "Kamala Harris is the Attorney General of California. She was previously "
            "the district attorney for San Francisco.",
            "type": EpisodeType.text,
            "description": "podcast transcript",
        },
        {
            "content": "As AG, Harris was in office from January 3, 2011 – January 3, 2017",
            "type": EpisodeType.text,
            "description": "podcast transcript",
        },
        {
            "content": {
                "name": "Gavin Newsom",
                "position": "Governor",
                "state": "California",
                "previous_role": "Lieutenant Governor",
                "previous_location": "San Francisco",
            },
            "type": EpisodeType.json,
            "description": "podcast metadata",
        },
        {
            "content": {
                "name": "Gavin Newsom",
                "position": "Governor",
                "term_start": "January 7, 2019",
                "term_end": "Present",
            },
            "type": EpisodeType.json,
            "description": "podcast metadata",
        },
    ]

    # Add episodes to the graph
    for i, episode in enumerate(episodes):
        await graphiti.add_episode(
            name=f"Freakonomics Radio {i}",
            episode_body=episode["content"] if isinstance(episode["content"], str) else json.dumps(episode["content"]),
            source=episode["type"],
            source_description=episode["description"],
            reference_time=datetime.now(timezone.utc),
        )
        print(f"Added episode: Freakonomics Radio {i} ({episode['type'].value})")

finally:
    # Close the connection
    await graphiti.close()
    print("\nConnection closed")

error

---------------------------------------------------------------------------
ClientError                               Traceback (most recent call last)
Cell In[4], line 45
     43     # Add episodes to the graph
     44     for i, episode in enumerate(episodes):
---> 45         await graphiti.add_episode(
     46             name=f"Freakonomics Radio {i}",
     47             episode_body=episode["content"] if isinstance(episode["content"], str) else json.dumps(episode["content"]),
     48             source=episode["type"],
     49             source_description=episode["description"],
     50             reference_time=datetime.now(timezone.utc),
     51         )
     52         print(f"Added episode: Freakonomics Radio {i} ({episode['type'].value})")
     54 finally:
     55     # Close the connection

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/graphiti.py:499, in Graphiti.add_episode(self, name, episode_body, source_description, reference_time, source, group_id, uuid, update_communities, entity_types, previous_episode_uuids)
    496     return AddEpisodeResults(episode=episode, nodes=nodes, edges=entity_edges)
    498 except Exception as e:
--> 499     raise e

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/graphiti.py:361, in Graphiti.add_episode(self, name, episode_body, source_description, reference_time, source, group_id, uuid, update_communities, entity_types, previous_episode_uuids)
    355 await semaphore_gather(
    356     *[node.generate_name_embedding(self.embedder) for node in extracted_nodes]
    357 )
    359 # Find relevant nodes already in the graph
    360 existing_nodes_lists: list[list[EntityNode]] = list(
--> 361     await semaphore_gather(
    362         *[
    363             get_relevant_nodes(self.driver, SearchFilters(), [node])
    364             for node in extracted_nodes
    365         ]
    366     )
    367 )
    369 # Resolve extracted nodes with nodes already in the graph and extract facts
    370 logger.debug(f'Extracted nodes: {[(n.name, n.uuid) for n in extracted_nodes]}')

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/helpers.py:96, in semaphore_gather(max_coroutines, *coroutines)
     93     async with semaphore:
     94         return await coroutine
---> 96 return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/helpers.py:94, in semaphore_gather.<locals>._wrap_coroutine(coroutine)
     92 async def _wrap_coroutine(coroutine):
     93     async with semaphore:
---> 94         return await coroutine

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/search/search_utils.py:668, in get_relevant_nodes(driver, search_filter, nodes)
    638 async def get_relevant_nodes(
    639     driver: AsyncDriver,
    640     search_filter: SearchFilters,
    641     nodes: list[EntityNode],
    642 ) -> list[EntityNode]:
    643     """
    644     Retrieve relevant nodes based on the provided list of EntityNodes.
    645 
   (...)    666     to use as search criteria.
    667     """
--> 668     relevant_nodes = await hybrid_node_search(
    669         [node.name for node in nodes],
    670         [node.name_embedding for node in nodes if node.name_embedding is not None],
    671         driver,
    672         search_filter,
    673         [node.group_id for node in nodes],
    674     )
    676     return relevant_nodes

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/search/search_utils.py:612, in hybrid_node_search(queries, embeddings, driver, search_filter, group_ids, limit)
    572 """
    573 Perform a hybrid search for nodes using both text queries and embeddings.
    574 
   (...)    607 limit (defined in the individual search functions) will be used.
    608 """
    610 start = time()
    611 results: list[list[EntityNode]] = list(
--> 612     await semaphore_gather(
    613         *[
    614             node_fulltext_search(driver, q, search_filter, group_ids, 2 * limit)
    615             for q in queries
    616         ],
    617         *[
    618             node_similarity_search(driver, e, search_filter, group_ids, 2 * limit)
    619             for e in embeddings
    620         ],
    621     )
    622 )
    624 node_uuid_map: dict[str, EntityNode] = {
    625     node.uuid: node for result in results for node in result
    626 }
    627 result_uuids = [[node.uuid for node in result] for result in results]

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/helpers.py:96, in semaphore_gather(max_coroutines, *coroutines)
     93     async with semaphore:
     94         return await coroutine
---> 96 return await asyncio.gather(*(_wrap_coroutine(coroutine) for coroutine in coroutines))

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/helpers.py:94, in semaphore_gather.<locals>._wrap_coroutine(coroutine)
     92 async def _wrap_coroutine(coroutine):
     93     async with semaphore:
---> 94         return await coroutine

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/graphiti_core/search/search_utils.py:343, in node_fulltext_search(driver, query, search_filter, group_ids, limit)
    339     return []
    341 filter_query, filter_params = node_search_filter_query_constructor(search_filter)
--> 343 records, _, _ = await driver.execute_query(
    344     """
    345     CALL db.index.fulltext.queryNodes("node_name_and_summary", $query, {limit: $limit}) 
    346     YIELD node AS node, score
    347     MATCH (n:Entity)
    348     WHERE n.uuid = node.uuid
    349     """
    350     + filter_query
    351     + """
    352     RETURN
    353         n.uuid AS uuid,
    354         n.group_id AS group_id, 
    355         n.name AS name, 
    356         n.name_embedding AS name_embedding,
    357         n.created_at AS created_at, 
    358         n.summary AS summary,
    359         labels(n) AS labels,
    360         properties(n) AS attributes
    361     ORDER BY score DESC
    362     LIMIT $limit
    363     """,
    364     filter_params,
    365     query=fuzzy_query,
    366     group_ids=group_ids,
    367     limit=limit,
    368     database_=DEFAULT_DATABASE,
    369     routing_='r',
    370 )
    371 nodes = [get_entity_node_from_record(record) for record in records]
    373 return nodes

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/driver.py:971, in AsyncDriver.execute_query(self, query_, parameters_, routing_, database_, impersonated_user_, bookmark_manager_, auth_, result_transformer_, **kwargs)
    967     raise ValueError(
    968         f"Invalid routing control value: {routing_!r}"
    969     )
    970 with session._pipelined_begin:
--> 971     return await session._run_transaction(
    972         access_mode,
    973         TelemetryAPI.DRIVER,
    974         work,
    975         (query_str, parameters, result_transformer_),
    976         {},
    977     )

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/session.py:583, in AsyncSession._run_transaction(self, access_mode, api, transaction_function, args, kwargs)
    581 tx = self._transaction
    582 try:
--> 583     result = await transaction_function(tx, *args, **kwargs)
    584 except asyncio.CancelledError:
    585     # if cancellation callback has not been called yet:
    586     if self._transaction is not None:

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/driver.py:1308, in _work(tx, query, parameters, transformer)
   1301 async def _work(
   1302     tx: AsyncManagedTransaction,
   1303     query: te.LiteralString,
   1304     parameters: dict[str, t.Any],
   1305     transformer: t.Callable[[AsyncResult], t.Awaitable[_T]],
   1306 ) -> _T:
   1307     res = await tx.run(query, parameters)
-> 1308     return await transformer(res)

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/result.py:802, in AsyncResult.to_eager_result(self)
    784 @AsyncNonConcurrentMethodChecker._non_concurrent_method
    785 async def to_eager_result(self) -> EagerResult:
    786     """
    787     Convert this result to an :class:`.EagerResult`.
    788 
   (...)    800     .. versionchanged:: 5.8 Stabilized from experimental.
    801     """
--> 802     await self._buffer_all()
    803     return EagerResult(
    804         keys=list(self.keys()),
    805         records=await AsyncUtil.list(self),
    806         summary=await self.consume(),
    807     )

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/result.py:459, in AsyncResult._buffer_all(self)
    458 async def _buffer_all(self):
--> 459     await self._buffer()

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/result.py:448, in AsyncResult._buffer(self, n)
    446     return
    447 record_buffer = deque()
--> 448 async for record in self:
    449     record_buffer.append(record)
    450     if n is not None and len(record_buffer) >= n:

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/work/result.py:398, in AsyncResult.__aiter__(self)
    396     yield self._record_buffer.popleft()
    397 elif self._streaming:
--> 398     await self._connection.fetch_message()
    399 elif self._discarding:
    400     self._discard()

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/io/_common.py:195, in ConnectionErrorHandler.__getattr__.<locals>.outer_async.<locals>.inner(*args, **kwargs)
    193 async def inner(*args, **kwargs):
    194     try:
--> 195         await coroutine_func(*args, **kwargs)
    196     except (
    197         Neo4jError,
    198         ServiceUnavailable,
    199         SessionExpired,
    200         asyncio.CancelledError,
    201     ) as exc:
    202         await AsyncUtil.callback(self.__on_error, exc)

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/io/_bolt.py:864, in AsyncBolt.fetch_message(self)
    860 # Receive exactly one message
    861 tag, fields = await self.inbox.pop(
    862     hydration_hooks=self.responses[0].hydration_hooks
    863 )
--> 864 res = await self._process_message(tag, fields)
    865 self.idle_since = monotonic()
    866 return res

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/io/_bolt5.py:500, in AsyncBolt5x0._process_message(self, tag, fields)
    498 self._server_state_manager.state = self.bolt_states.FAILED
    499 try:
--> 500     await response.on_failure(summary_metadata or {})
    501 except (ServiceUnavailable, DatabaseUnavailable):
    502     if self.pool:

File /opt/anaconda3/envs/grafiti/lib/python3.11/site-packages/neo4j/_async/io/_common.py:254, in Response.on_failure(self, metadata)
    252 handler = self.handlers.get("on_summary")
    253 await AsyncUtil.callback(handler)
--> 254 raise self._hydrate_error(metadata)

ClientError: {code: Neo.ClientError.Procedure.ProcedureCallFailed} {message: Failed to invoke procedure `db.index.fulltext.queryNodes`: Caused by: java.lang.IllegalArgumentException: There is no such fulltext schema index: node_name_and_summary}

lib version graphiti-core 0.9.6 Python 3.11.11

javi-horizon avatar Apr 15 '25 14:04 javi-horizon

Have you run graphiti.build_indices_and_constraints() in your DB?

prasmussen15 avatar Apr 18 '25 15:04 prasmussen15

@javi-horizon Is this still an issue? Please confirm within 14 days or this issue will be closed.

claude[bot] avatar Oct 03 '25 00:10 claude[bot]

@javi-horizon Is this still an issue? Please confirm within 14 days or this issue will be closed.

claude[bot] avatar Oct 20 '25 00:10 claude[bot]

@javi-horizon Is this still an issue? Please confirm within 14 days or this issue will be closed.

claude[bot] avatar Oct 22 '25 00:10 claude[bot]

@javi-horizon Is this still an issue? Please confirm within 14 days or this issue will be closed.

claude[bot] avatar Oct 29 '25 00:10 claude[bot]

@javi-horizon Is this still an issue? Please confirm within 14 days or this issue will be closed.

claude[bot] avatar Nov 17 '25 00:11 claude[bot]