Out of memory after multiple executions of custom component
Bug Description
Hello,
I have developed a functional custom component designed to re-rank data for a RAG chatbot.. This reranker use the package rerankers (note : using other packages as FlagEmbedding or transformers do the same bug). My LangFlow application is deployed within a Docker container environment. Upon executing the flow, all components function as expected; however, I observe an incremental increase in memory usage until an eventual crash occurs after multiple executions, presenting this error message :
chatbot-rag-llm | [02/18/25 10:36:09] ERROR 2025-02-18 10:36:09 - ERROR - __init__.py:1706
chatbot-rag-llm | __init__ - Worker (pid:57) was sent SIGKILL! Perhaps out of memory?
chatbot-rag-llm | Starting Langflow v1.1.4...
Note : I run the flow
Here a graph of my memory usage on my container using running langflow after multiple executions :
Here the code on my custom component : Reranker.json
{ "data": { "edges": [], "nodes": [ { "data": { "node": { "template": { "_type": "Component", "data": { "tool_mode": false, "trace_as_metadata": true, "list": false, "trace_as_input": true, "required": false, "placeholder": "", "show": true, "name": "data", "value": "", "display_name": "Data", "advanced": false, "input_types": [ "Data" ], "dynamic": false, "info": "The data to convert to text.", "title_case": false, "type": "other", "_input_type": "DataInput" }, "code": { "type": "code", "required": true, "placeholder": "", "list": false, "show": true, "multiline": true, "value": "# from langflow.field_typing import Data\nfrom langflow.custom import Component\nfrom langflow.io import MessageTextInput, Output\nfrom langflow.helpers.data import data_to_text\nfrom langflow.schema import Data\nfrom rerankers import Reranker\n\nclass CustomComponent(Component):\n display_name = \"Reranker\"\n description = \"Use as a template to create your own component.\"\n documentation: str = \"https://pypi.org/project/rerankers/\"\n icon = \"HuggingFace\"\n name = \"Reranker\"\n\n inputs = [\n DataInput(name=\"data\", display_name=\"Data\", info=\"The data to convert to text.\"),\n MultilineInput(name=\"search_query\", display_name=\"Search Query\",required=True),\n StrInput(name=\"model_reranker\", display_name=\"HuggingFace reranker model\",required=True),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Number of Results\",\n info=\"Number of results to return.\",\n value=10,\n required=True,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Output\", name=\"output\", method=\"build_output\"),\n ]\n\n def build_output(self) -> Message:\n docs=[]\n data = self.data if isinstance(self.data, list) else [self.data]\n for doc in data:\n docs.append(data_to_text('{text}',doc,sep=\"\\n\"))\n ranker = Reranker(self.model_reranker, model_type='cross-encoder',verbose=0)\n results = ranker.rank(query=self.search_query, docs=docs, doc_ids=[i for i in range(len(docs))]).top_k(self.number_of_results)\n result_string=\"\"\n for i in range(self.number_of_results):\n result_string+=results[i].text\n result_string+=\"\\n \\n\"\n return Message(text=result_string)", "fileTypes": [], "file_path": "", "password": false, "name": "code", "advanced": true, "dynamic": true, "info": "", "load_from_db": false, "title_case": false }, "model_reranker": { "trace_as_metadata": true, "load_from_db": false, "list": false, "required": true, "placeholder": "", "show": true, "name": "model_reranker", "value": "mixedbread-ai/mxbai-rerank-large-v1", "display_name": "HuggingFace reranker model", "advanced": false, "dynamic": false, "info": "", "title_case": false, "type": "str", "_input_type": "StrInput" }, "number_of_results": { "trace_as_metadata": true, "list": false, "required": true, "placeholder": "", "show": true, "name": "number_of_results", "value": 10, "display_name": "Number of Results", "advanced": false, "dynamic": false, "info": "Number of results to return.", "title_case": false, "type": "int", "_input_type": "IntInput" }, "search_query": { "tool_mode": false, "trace_as_input": true, "multiline": true, "trace_as_metadata": true, "load_from_db": false, "list": false, "required": true, "placeholder": "", "show": true, "name": "search_query", "value": "", "display_name": "Search Query", "advanced": false, "input_types": [ "Message" ], "dynamic": false, "info": "", "title_case": false, "type": "str", "_input_type": "MultilineInput" } }, "description": "Use as a template to create your own component.", "icon": "HuggingFace", "base_classes": [ "Message" ], "display_name": "Reranker", "documentation": "https://pypi.org/project/rerankers/", "custom_fields": {}, "output_types": [], "pinned": false, "conditional_paths": [], "frozen": false, "outputs": [ { "types": [ "Message" ], "selected": "Message", "name": "output", "display_name": "Output", "method": "build_output", "value": "__UNDEFINED__", "cache": true } ], "field_order": [ "data", "search_query", "model_reranker", "number_of_results" ], "beta": false, "legacy": false, "edited": true, "metadata": {}, "tool_mode": false, "official": false }, "type": "Reranker", "id": "CustomComponent-Reranker" }, "id": "CustomComponent-Reranker", "position": { "x": 0, "y": 0 }, "type": "genericNode" } ], "viewport": { "x": 1, "y": 1, "zoom": 1 } }, "description": "Use as a template to create your own component.", "name": "Reranker", "id": "CustomComponent-Reranker", "is_component": true, "last_tested_version": "1.1.0" }
My working pipeline :
Thank you for your assistance !
Reproduction
- Import my custom component Reranker.json in langflow
- Establish a connection between the vector store component and the prompt component.
- Also link the reranker to the chat input component.
Execute the code multiple times, observing memory usage.
Expected behavior
The memory usage should be stable.
Who can help?
@italojohnny
Operating System
Windows 11
Langflow Version
1.1.4
Python Version
3.11
Screenshot
Flow File
Flow : Flow Reranker Bug.json
Component : Reranker.json
Hi, there may be solution for that...
Inside custom component use
import gc
from multiprocessing import Process, Manager
here is example method
def your_task(shared_dict):
shared_dict - that is how you share values
then after you define your method you want to isolate it with call like that:
process = Process(target=your_task, args=(shared_dict,))
do not forget to use some garbage collecting :-)
try:
your code
finally:
del your_vars
gc.collect()
Hello,
Thanks you for your reply @severfire.
Your solution fixed my issue !
Here is the code in my custom component :
from langflow.field_typing import Data
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.helpers.data import data_to_text
from langflow.schema import Data
from rerankers import Reranker
import gc
from multiprocessing import Process, Manager
class CustomComponent(Component):
display_name = "Reranker"
description = "Use as a template to create your own component."
documentation: str = "https://pypi.org/project/rerankers/"
icon = "HuggingFace"
name = "Reranker"
inputs = [
DataInput(name="data", display_name="Data", info="The data to convert to text."),
MultilineInput(name="search_query", display_name="Search Query", required=True),
StrInput(name="model_reranker", display_name="HuggingFace reranker model", required=True),
IntInput(
name="number_of_results",
display_name="Number of Results",
info="Number of results to return.",
value=10,
required=True,
),
]
def build_output(self) -> Message:
manager = Manager()
shared_dict = manager.dict()
def rerank():
docs = []
data = self.data if isinstance(self.data, list) else [self.data]
for doc in data:
docs.append(data_to_text('{text}', doc, sep="\n"))
ranker = Reranker(self.model_reranker, model_type='cross-encoder', verbose=0)
results = ranker.rank(query=self.search_query, docs=docs, doc_ids=[i for i in range(len(docs))]).top_k(self.number_of_results)
result_string = ""
for i in range(self.number_of_results):
result_string += results[i].text
result_string += "\n \n"
shared_dict['result'] = result_string # Store the result in the shared dictionary
process = Process(target=rerank)
process.start()
process.join() # Wait for the process to complete
try:
return Message(text=shared_dict['result']) # Retrieve and return the result from the shared dictionary
finally:
del shared_dict # Clean up the shared dictionary
del process
gc.collect() # Force garbage collection
outputs = [
Output(display_name="Output", name="output", method="build_output"),
]
Memory usage :
@ogabrielluiz check this out :-) that's why i think this could solve a lot of possible issues :-)
@VRivi I am happy it helped!
we are having this problem too
@ogabrielluiz multiprocessing resolved the issue for @VRivi
The Docling Component also has this error, which was fixed using the above multiprocessing pattern. While the Docling Component will be fixed for v1.6.0 release, general custom components will not inherently have this behavior, so users will need to use the solution(s) as shown above as a workaround in the meantime.
We'll investigate better handling of this for general custom components, but the fix wasn't made universal for v1.6.0 as we haven't yet investigated the performance impact that creating a subprocess for each component will have.
We'll investigate better handling of this for general custom components, but the fix wasn't made universal for v1.6.0 as we haven't yet investigated the performance impact that creating a subprocess for each component will have.
Beyond the performance impact, I believe that the arguments to the subprocess need to be serializable, which may have issues in cases where the component relies on passing an object (eg., Embeddings) or sharing/caching (eg., an API client object that has an internal cache).
We'll investigate better handling of this for general custom components, but the fix wasn't made universal for v1.6.0 as we haven't yet investigated the performance impact that creating a subprocess for each component will have.
Beyond the performance impact, I believe that the arguments to the subprocess need to be serializable, which may have issues in cases where the component relies on passing an object (eg.,
Embeddings) or sharing/caching (eg., an API client object that has an internal cache).
We could modularize this pattern into a method or a subclass of Component and move components to it, plus document how to use it in Custom Components.
@ogabrielluiz Thank you! Can't wait to test it :-) It could be interesting to have switch 'isolate' on component
that is not even the duplicate of https://github.com/langflow-ai/langflow/issues/6645