[Feature Request]: Neo4j Batching With UNWIND on get_node, get_edge, node_degree, edge_degree, get_node_edges
Do you need to file a feature request?
- [x] I have searched the existing feature request and this feature request is not already filed.
- [x] I believe this is a legitimate feature request, not just a question or bug.
Feature Request Description
Overview
This feature request proposes a performance enhancement for LightRAG when using Neo4j as the GraphDB storage. Currently, individual queries are executed for functions like get_edge and edge_degree, resulting in thousands of Neo4j queries for a single user query. By batching these calls using the UNWIND clause, we can drastically reduce the number of queries sent to Neo4j, thereby improving performance and reducing connection overhead.
Motivation
- Performance Issues:
Profiling with Logfire and cProfile revealed that for one question, Neo4j was being queried thousands of times due to non-batched function calls.
Old cProfile results:
New cProfile results (with all Neo4j batched):
-
Connection Pool Limitations:
The current implementation relies on a connection pool (with a default size set via the environment variableNEO4J_MAX_CONNECTION_POOL_SIZE, fallback to 50). Increasing the pool size was previously used as a workaround for 30s timeouts of Neo4j. With batching, the number of queries—and hence the load on the connection pool—should be significantly reduced, mitigating this issue. -
Consistency Across Storage Systems:
Similar batching improvements have already been applied for Redis (usingMGET), which greatly reduced the number of calls. Applying a similar strategy to Neo4j (and eventually to all graph databases) will ensure consistent performance improvements across the system.
Proposed Changes
-
Batching Functions:
- Create batched versions for the following functions:
get_nodeget_edgenode_degreeedge_degreeget_node_edges
- For example, instead of calling:
we propose:# Old way: edge_datas, edge_degrees = await asyncio.gather( asyncio.gather( *[knowledge_graph_inst.get_edge(r["src_id"], r["tgt_id"]) for r in results] ), asyncio.gather( *[knowledge_graph_inst.edge_degree(r["src_id"], r["tgt_id"]) for r in results] ), )# New batched way: edge_pairs_dicts = [{"src": r["src_id"], "tgt": r["tgt_id"]} for r in results] edge_pairs_tuples = [(r["src_id"], r["tgt_id"]) for r in results] edge_data_dict, edge_degrees_dict = await asyncio.gather( knowledge_graph_inst.get_edges_batch(edge_pairs_dicts), knowledge_graph_inst.get_edges_degree_batch(edge_pairs_tuples) ) - The batched functions should use Neo4j’s
UNWINDclause to process the entire list in a single query.
- Create batched versions for the following functions:
-
APOC Dependency:
- Ensure that the APOC file is downloaded and copied over to the Neo4j container’s data folder so that the necessary procedures are available.
-
Connection Pool Settings:
- Highlight that while
MAX_CONNECTION_POOL_SIZEis configurable (default is 50), with this new batching mechanism the connection pool is less likely to be overwhelmed, reducing the need to set this value higher.
- Highlight that while
-
Extend to Other Graph DBs:
- Although this feature is primarily targeted at Neo4j, the same batching approach could be applied to any graph database to improve performance. (I would assume that most GraphDB applications have some type of batching)
-
Basic Neo4j Improvements:
- If you check my Neo4j code thoroughly you can see that even the basic functions have been changed to hopefully improve the code when running into things like "super-nodes" in a knowledge graph. An example is using the node_degree like this:
# query = """
# MATCH (n:base {entity_id: $entity_id})
# OPTIONAL MATCH (n)-[r]-()
# RETURN COUNT(r) AS degree
# """
# This should be an improved query for when traversing every single edge of a node.
query = """
MATCH (n:base {entity_id: $entity_id})
RETURN count { (n)--() } AS degree;
"""
Implementation Details
As shown in the title 5 functions have been replaced by batch calls: get_node > get_nodes_batch, get_edge > get_edges_batch, node_degree > get_node_degrees_batch, edge_degree > get_edges_degree_batch, get_node_edges > get_nodes_edges_batch.
In neo4j_impl.py:
async def get_node(self, node_id: str) -> dict[str, str] | None:
"""Get node by its label identifier.
Args:
node_id: The node label to look up
Returns:
dict: Node properties if found
None: If node not found
Raises:
ValueError: If node_id is invalid
Exception: If there is an error executing the query
"""
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
try:
query = "MATCH (n:base {entity_id: $entity_id}) RETURN n"
result = await session.run(query, entity_id=node_id)
try:
records = await result.fetch(2) # Get 2 records for duplication check
if len(records) > 1:
logger.warning(
f"Multiple nodes found with label '{node_id}'. Using first node."
)
if records:
node = records[0]["n"]
node_dict = dict(node)
# Remove base label from labels list if it exists
if "labels" in node_dict:
node_dict["labels"] = [
label
for label in node_dict["labels"]
if label != "base"
]
logger.debug(f"Neo4j query node {query} return: {node_dict}")
return node_dict
return None
finally:
await result.consume() # Ensure result is fully consumed
except Exception as e:
logger.error(f"Error getting node for {node_id}: {str(e)}")
raise
async def get_nodes_batch(self, node_ids: list[str]) -> dict[str, dict]:
"""
Retrieve multiple nodes in one query using UNWIND.
Args:
node_ids: List of node entity IDs to fetch.
Returns:
A dictionary mapping each node_id to its node data (or None if not found).
"""
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
UNWIND $node_ids AS id
MATCH (n:base {entity_id: id})
RETURN n.entity_id AS entity_id, n
"""
result = await session.run(query, node_ids=node_ids)
nodes = {}
async for record in result:
entity_id = record["entity_id"]
node = record["n"]
node_dict = dict(node)
# Remove the 'base' label if present in a 'labels' property
if "labels" in node_dict:
node_dict["labels"] = [label for label in node_dict["labels"] if label != "base"]
nodes[entity_id] = node_dict
await result.consume() # Make sure to consume the result fully
return nodes
#@logfire.instrument
async def node_degree(self, node_id: str) -> int:
"""Get the degree (number of relationships) of a node with the given label.
If multiple nodes have the same label, returns the degree of the first node.
If no node is found, returns 0.
Args:
node_id: The label of the node
Returns:
int: The number of relationships the node has, or 0 if no node found
Raises:
ValueError: If node_id is invalid
Exception: If there is an error executing the query
"""
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
try:
# query = """
# MATCH (n:base {entity_id: $entity_id})
# OPTIONAL MATCH (n)-[r]-()
# RETURN COUNT(r) AS degree
# """
# This should be an improved query for when traversing every single edge of a node.
query = """
MATCH (n:base {entity_id: $entity_id})
RETURN count { (n)--() } AS degree;
"""
result = await session.run(query, entity_id=node_id)
try:
record = await result.single()
if not record:
logger.warning(f"No node found with label '{node_id}'")
return 0
degree = record["degree"]
logger.debug(
"Neo4j query node degree for {node_id} return: {degree}"
)
return degree
finally:
await result.consume() # Ensure result is fully consumed
except Exception as e:
logger.error(f"Error getting node degree for {node_id}: {str(e)}")
raise
async def get_node_degrees_batch(self, node_ids: list[str]) -> dict[str, int]:
"""
Retrieve the degree for multiple nodes in a single query using UNWIND.
Args:
node_ids: List of node labels (entity_id values) to look up.
Returns:
A dictionary mapping each node_id to its degree (number of relationships).
If a node is not found, its degree will be set to 0.
"""
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
UNWIND $node_ids AS id
MATCH (n:base {entity_id: id})
RETURN n.entity_id AS entity_id, count { (n)--() } AS degree;
"""
result = await session.run(query, node_ids=node_ids)
degrees = {}
async for record in result:
entity_id = record["entity_id"]
degrees[entity_id] = record["degree"]
await result.consume() # Ensure result is fully consumed
# For any node_id that did not return a record, set degree to 0.
for nid in node_ids:
if nid not in degrees:
logger.warning(f"No node found with label '{nid}'")
degrees[nid] = 0
logger.debug(f"Neo4j batch node degree query returned: {degrees}")
return degrees
#@logfire.instrument
async def edge_degree(self, src_id: str, tgt_id: str) -> int:
"""Get the total degree (sum of relationships) of two nodes.
Args:
src_id: Label of the source node
tgt_id: Label of the target node
Returns:
int: Sum of the degrees of both nodes
"""
src_degree = await self.node_degree(src_id)
trg_degree = await self.node_degree(tgt_id)
# Convert None to 0 for addition
src_degree = 0 if src_degree is None else src_degree
trg_degree = 0 if trg_degree is None else trg_degree
degrees = int(src_degree) + int(trg_degree)
return degrees
async def get_edges_degree_batch(self, edge_pairs: list[tuple[str, str]]) -> dict[tuple[str, str], int]:
"""
Calculate the combined degree for each edge (sum of the source and target node degrees)
in batch using the already implemented get_node_degrees_batch.
Args:
edge_pairs: List of (src, tgt) tuples.
Returns:
A dictionary mapping each (src, tgt) tuple to the sum of their degrees.
"""
# Collect unique node IDs from all edge pairs.
unique_node_ids = {src for src, _ in edge_pairs}
unique_node_ids.update({tgt for _, tgt in edge_pairs})
# Get degrees for all nodes in one go.
degrees = await self.get_node_degrees_batch(list(unique_node_ids))
# Sum up degrees for each edge pair.
edge_degrees = {}
for src, tgt in edge_pairs:
edge_degrees[(src, tgt)] = degrees.get(src, 0) + degrees.get(tgt, 0)
return edge_degrees
#@logfire.instrument
async def get_edge(
self, source_node_id: str, target_node_id: str
) -> dict[str, str] | None:
"""Get edge properties between two nodes.
Args:
source_node_id: Label of the source node
target_node_id: Label of the target node
Returns:
dict: Edge properties if found, default properties if not found or on error
Raises:
ValueError: If either node_id is invalid
Exception: If there is an error executing the query
"""
try:
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
# r is changed to r:DIRECTED for specifying the edge type, but for now all edge types are DIRECTED.
query = """
MATCH (start:base {entity_id: $source_entity_id})-[r:DIRECTED]-(end:base {entity_id: $target_entity_id})
RETURN properties(r) as edge_properties
"""
result = await session.run(
query,
source_entity_id=source_node_id,
target_entity_id=target_node_id,
)
try:
records = await result.fetch(2)
if len(records) > 1:
logger.warning(
f"Multiple edges found between '{source_node_id}' and '{target_node_id}'. Using first edge."
)
if records:
try:
edge_result = dict(records[0]["edge_properties"])
logger.debug(f"Result: {edge_result}")
# Ensure required keys exist with defaults
required_keys = {
"weight": 0.0,
"source_id": None,
"description": None,
"keywords": None,
}
for key, default_value in required_keys.items():
if key not in edge_result:
edge_result[key] = default_value
logger.warning(
f"Edge between {source_node_id} and {target_node_id} "
f"missing {key}, using default: {default_value}"
)
logger.debug(
f"{inspect.currentframe().f_code.co_name}:query:{query}:result:{edge_result}"
)
return edge_result
except (KeyError, TypeError, ValueError) as e:
logger.error(
f"Error processing edge properties between {source_node_id} "
f"and {target_node_id}: {str(e)}"
)
# Return default edge properties on error
return {
"weight": 0.0,
"source_id": None,
"description": None,
"keywords": None,
}
logger.debug(
f"{inspect.currentframe().f_code.co_name}: No edge found between {source_node_id} and {target_node_id}"
)
# Return default edge properties when no edge found
return {
"weight": 0.0,
"source_id": None,
"description": None,
"keywords": None,
}
finally:
await result.consume() # Ensure result is fully consumed
except Exception as e:
logger.error(
f"Error in get_edge between {source_node_id} and {target_node_id}: {str(e)}"
)
raise
async def get_edges_batch(self, pairs: list[dict[str, str]]) -> dict[tuple[str, str], dict]:
"""
Retrieve edge properties for multiple (src, tgt) pairs in one query.
Args:
pairs: List of dictionaries, e.g. [{"src": "node1", "tgt": "node2"}, ...]
Returns:
A dictionary mapping (src, tgt) tuples to their edge properties.
"""
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
query = """
UNWIND $pairs AS pair
MATCH (start:base {entity_id: pair.src})-[r:DIRECTED]-(end:base {entity_id: pair.tgt})
RETURN pair.src AS src_id, pair.tgt AS tgt_id, collect(properties(r)) AS edges
"""
result = await session.run(query, pairs=pairs)
edges_dict = {}
async for record in result:
src = record["src_id"]
tgt = record["tgt_id"]
edges = record["edges"]
if edges and len(edges) > 0:
edge_props = edges[0] # choose the first if multiple exist
# Ensure required keys exist with defaults
for key, default in {"weight": 0.0, "source_id": None, "description": None, "keywords": None}.items():
if key not in edge_props:
edge_props[key] = default
edges_dict[(src, tgt)] = edge_props
else:
# No edge found – set default edge properties
edges_dict[(src, tgt)] = {"weight": 0.0, "source_id": None, "description": None, "keywords": None}
await result.consume()
return edges_dict
# Hopefully improved query function.
#@logfire.instrument
async def get_node_edges(self, source_node_id: str) -> list[tuple[str, str]]:
"""Retrieves all edges (relationships) for a particular node identified by its label.
Args:
source_node_id: Label of the node to get edges for
Returns:
list[tuple[str, str]]: List of (source_label, target_label) tuples representing edges.
Returns an empty list if no edges are found.
Raises:
ValueError: If source_node_id is invalid.
Exception: If there is an error executing the query.
"""
try:
async with self._driver.session(
database=self._DATABASE, default_access_mode="READ"
) as session:
try:
query = """
MATCH (n:base {entity_id: $entity_id})
OPTIONAL MATCH (n)-[r]-(connected:base)
RETURN n.entity_id AS source_entity_id, connected.entity_id AS target_entity_id
"""
results = await session.run(query, entity_id=source_node_id)
edges = []
async for record in results:
source_label = record["source_entity_id"]
target_label = record["target_entity_id"]
# Only add the edge if both labels are present
if source_label and target_label:
edges.append((source_label, target_label))
await results.consume() # Ensure all results are consumed
return edges
except Exception as e:
logger.error(f"Error getting edges for node {source_node_id}: {str(e)}")
await results.consume() # Ensure results are consumed even on error
raise
except Exception as e:
logger.error(f"Error in get_node_edges for {source_node_id}: {str(e)}")
raise
# New batch function using UNWIND
async def get_nodes_edges_batch(self, node_ids: list[str]) -> dict[str, list[tuple[str, str]]]:
"""
Batch retrieve edges for multiple nodes in one query using UNWIND.
Args:
node_ids: List of node IDs (entity_id) for which to retrieve edges.
Returns:
A dictionary mapping each node ID to its list of edge tuples (source, target).
"""
async with self._driver.session(database=self._DATABASE, default_access_mode="READ") as session:
query = """
UNWIND $node_ids AS id
MATCH (n:base {entity_id: id})
OPTIONAL MATCH (n)-[r]-(connected:base)
RETURN id AS queried_id, n.entity_id AS source_entity_id, connected.entity_id AS target_entity_id
"""
result = await session.run(query, node_ids=node_ids)
# Initialize the dictionary with empty lists for each node ID
edges_dict = {node_id: [] for node_id in node_ids}
async for record in result:
queried_id = record["queried_id"]
source_label = record["source_entity_id"]
target_label = record["target_entity_id"]
if source_label and target_label:
edges_dict[queried_id].append((source_label, target_label))
await result.consume() # Ensure results are fully consumed
return edges_dict
In operate.py:
# In _get_node_data()
# get entity information (CURRENT LIGHTRAG IMPLEMENTATION)
# node_datas, node_degrees = await asyncio.gather(
# asyncio.gather(
# *[knowledge_graph_inst.get_node(r["entity_name"]) for r in results]
# ),
# asyncio.gather(
# *[knowledge_graph_inst.node_degree(r["entity_name"]) for r in results]
# ),
# )
# My implementation using UNWIND to batch ids into a list and send one request to Neo4j
# Extract all entity IDs from your results list
node_ids = [r["entity_name"] for r in results]
# Call the batch node retrieval and degree functions concurrently.
nodes_dict, degrees_dict = await asyncio.gather(
knowledge_graph_inst.get_nodes_batch(node_ids), # Your previously defined batch node retrieval
knowledge_graph_inst.get_node_degrees_batch(node_ids)
)
# In _find_most_related_entities_from_relationships()
# Original approach using individual calls:
# node_datas, node_degrees = await asyncio.gather(
# asyncio.gather(
# *[knowledge_graph_inst.get_node(entity_name) for entity_name in entity_names]
# ),
# asyncio.gather(
# *[knowledge_graph_inst.node_degree(entity_name) for entity_name in entity_names]
# ),
# )
# node_datas = [
# {**n, "entity_name": k, "rank": d}
# for k, n, d in zip(entity_names, node_datas, node_degrees)
# ]
# Batch approach: Retrieve nodes and their degrees concurrently with one query each.
nodes_dict, degrees_dict = await asyncio.gather(
knowledge_graph_inst.get_nodes_batch(entity_names),
knowledge_graph_inst.get_node_degrees_batch(entity_names)
)
This is one example of converting the code from get_node to get_nodes_batch.
Additional Context
-
Redis Improvement Example:
A similar optimization was applied to Redis by converting the loop into a single bulk call using mget (multiple get):async def get_by_ids_batch(self, ids: list[str]) -> list[dict[str, Any]]: keys = [f"{self.namespace}:{id}" for id in ids] data_list = await self._redis.mget(*keys) return [json.loads(data) if data else None for data in data_list] -
Benefits:
- Performance: Drastically reduce the number of queries to Neo4j.
- Connection Efficiency: Reduce the stress on the Neo4j connection pool, minimizing timeouts.
- Generalization: Sets a precedent for batching across all graph DB integrations in LightRAG.
Conclusion
By implementing batched queries using UNWIND in Neo4j and updating the connection pool handling, we expect a significant performance improvement in LightRAG when processing queries. This feature will also simplify the overall architecture and extend the benefits to other graph database integrations.
I look forward to the team's feedback and collaboration on integrating this enhancement.
PS: My full stack currently includes Milvus, Redis, MongoDB, Neo4j, and FastAPI with Gunicorn. I’m a student, and I’ll be hosting this on a Linux server and running it in production in a minimal environment. If I spot even more improvements, I’ll post them.
Additional Context
I couldn't post the full code so I'm hoping this works. Notice that I am mocking something like the calls to OpenAI so you can not just copy paste everything. Also the Base.py should also be updated ofcourse to include the new batch functions.
Possible related issues: https://github.com/HKUDS/LightRAG/issues/1190 https://github.com/HKUDS/LightRAG/issues/1180 https://github.com/HKUDS/LightRAG/issues/1179
PR #1258 introduce a batch operation to prevent making to many parallel query to database. Is that address this problem?
PR #1258 introduce a batch operation to prevent making to many parallel query to database. Is that address this problem?
Well yes kind of. The issue you linked talks about RedisKV and adjusting pooling. In my design I only used the MGET key. I think the #1258 is way more extensive than what I did and I don't know if it makes the performance in the end better.
What I'm proposing in my feature request here is that instead of sending one [str] to the database you send a List[str] and retrieve a list of data back. This makes it come down to only 1 call instead of thousands.
Also I had some ideas on improving LightRAG. Like when inserting chunks into the KG you do a hybrid search with "query_param.only_context" turned on and also add that to the "entity_extraction_prompt" so the AI can better understand the information currently stored and this way you have better relationships within the knowledge graph.
Also one more addition is being able to "Hybird", Global, Local, Naive, Mix" with a metadata filter turned on. This would also mean that when uploading a document like in the lightRAG Server you would need to be able to add your own metadata tags.
Your suggestion is excellent. Could you submit a PR to help us complete the program modifications?
Hi @danielaskdd ,
Just to clarify: when you mentioned “your suggestion,” are you specifically referring to combining the calls into one larger database call? I ask because when I implement this for Neo4j, all GraphDBStorage object classes will have the added "batch" functions (as mentioned in the feature request). Without a consistent implementation across all storage applications, this might lead to conflicts.
I'm happy to help with this and plan to work on it tomorrow. So, to confirm, the goal is to implement the Neo4j UNWIND feature and submit a PR for that specific implementation, right?
Thanks!
To ensure consistency and improve performance, batch processing functionality should be implemented across all graph storage systems. We will prioritize Neo4j first, followed by NetworkX and PostgreSQL AGE. A dedicated development branch will be created for this initiative. The branch will only be merged once all three graph storage updates are fully completed.
- dev branch for this issue: graph-storage-batch-query
Hi [@danielaskdd],
can we use similar kind of method to reduce indexing time such as _merge_nodes_then_upsert_batched
async def _merge_nodes_then_upsert_batched(
entity_name: str,
nodes_data: list[dict],
existing_nodes: dict,
knowledge_graph_inst: BaseGraphStorage, # Not used for upsert here anymore
global_config: dict,
) -> dict:
"""
Merge node data with existing node data (if any), then return the merged node_data
(no DB write in this function).
"""
timings = {}
start_total = time.perf_counter()
# Step 1: Get already existing node
start = time.perf_counter()
already_node = existing_nodes.get(entity_name)
timings["fetch_existing_node"] = time.perf_counter() - start
already_entity_types = []
already_source_ids = []
already_description = []
already_file_paths = []
# Step 2: Merge with existing node
start = time.perf_counter()
if already_node is not None:
already_entity_types.append(already_node.get("entity_type", ""))
already_source_ids.extend(
split_string_by_multi_markers(already_node.get("source_id", ""), [GRAPH_FIELD_SEP])
)
already_file_paths.extend(
split_string_by_multi_markers(already_node.get("file_path", ""), [GRAPH_FIELD_SEP])
)
already_description.append(already_node.get("description", ""))
timings["process_existing_node"] = time.perf_counter() - start
# Step 3: Determine the most common entity_type
start = time.perf_counter()
entity_type = sorted(
Counter(
[dp["entity_type"] for dp in nodes_data] + already_entity_types
).items(),
key=lambda x: x[1],
reverse=True,
)[0][0]
timings["compute_entity_type"] = time.perf_counter() - start
# Step 4: Merge fields (description, source_id, file_path)
start = time.perf_counter()
description = GRAPH_FIELD_SEP.join(
sorted(set([dp["description"] for dp in nodes_data] + already_description))
)
source_id = GRAPH_FIELD_SEP.join(
set([dp["source_id"] for dp in nodes_data] + already_source_ids)
)
file_path = GRAPH_FIELD_SEP.join(
set([dp["file_path"] for dp in nodes_data] + already_file_paths)
)
timings["merge_fields"] = time.perf_counter() - start
logger.debug(f"file_path: {file_path}")
# Step 5: Generate summarized description
start = time.perf_counter()
description = await _handle_entity_relation_summary(
entity_name, description, global_config
)
timings["handle_summary"] = time.perf_counter() - start
# Step 6: Build node data dict (no DB write here)
start = time.perf_counter()
node_data = dict(
entity_id=entity_name,
entity_type=entity_type,
description=description,
source_id=source_id,
file_path=file_path,
)
timings["build_node_data"] = time.perf_counter() - start
node_data["entity_name"] = entity_name
total_time = time.perf_counter() - start_total
timings["total"] = total_time
logger.info(f"[TIMING] _merge_nodes_then_upsert_batched for '{entity_name}': {timings}")
return node_data
and then use:
all_entities_data = await asyncio.gather(
*[
_merge_nodes_then_upsert_batched(k, v, existing_nodes, knowledge_graph_inst, global_config)
for k, v in maybe_nodes.items()
]
)
await knowledge_graph_inst.upsert_nodes_batch(all_entities_data)
and the implementation of upsert_nodes_batch
async def upsert_nodes_batch(self, nodes: list[dict[str, str]]) -> None:
"""
Batch upsert nodes into Neo4j using UNWIND for high performance.
Each node must include 'entity_id' and 'entity_type'.
"""
query = """
UNWIND $nodes AS node
MERGE (n:base {entity_id: node.entity_id})
SET n += node
SET n:`${entity_type}`
"""
# Prepare data and entity type labels
for node in nodes:
if "entity_id" not in node:
raise ValueError("Each node must have 'entity_id'")
if "entity_type" not in node:
node["entity_type"] = "base"
try:
async with self._driver.session(database=self._DATABASE) as session:
await session.execute_write(
lambda tx: tx.run(query, nodes=nodes)
)
except Exception as e:
logger.error(f"Batch upsert failed: {e}")
raise
We can do similar kind of things with edge_data as well