langflow icon indicating copy to clipboard operation
langflow copied to clipboard

Out of memory after multiple executions of custom component

Open VRivi opened this issue 10 months ago • 11 comments

Bug Description

Hello,

I have developed a functional custom component designed to re-rank data for a RAG chatbot.. This reranker use the package rerankers (note : using other packages as FlagEmbedding or transformers do the same bug). My LangFlow application is deployed within a Docker container environment. Upon executing the flow, all components function as expected; however, I observe an incremental increase in memory usage until an eventual crash occurs after multiple executions, presenting this error message :

chatbot-rag-llm  | [02/18/25 10:36:09] ERROR    2025-02-18 10:36:09 - ERROR    -   __init__.py:1706
chatbot-rag-llm  | __init__ - Worker (pid:57) was sent SIGKILL! Perhaps out of memory?
chatbot-rag-llm  | Starting Langflow v1.1.4...

Note : I run the flow

Here a graph of my memory usage on my container using running langflow after multiple executions : Image

Here the code on my custom component : Reranker.json

{ "data": { "edges": [], "nodes": [ { "data": { "node": { "template": { "_type": "Component", "data": { "tool_mode": false, "trace_as_metadata": true, "list": false, "trace_as_input": true, "required": false, "placeholder": "", "show": true, "name": "data", "value": "", "display_name": "Data", "advanced": false, "input_types": [ "Data" ], "dynamic": false, "info": "The data to convert to text.", "title_case": false, "type": "other", "_input_type": "DataInput" }, "code": { "type": "code", "required": true, "placeholder": "", "list": false, "show": true, "multiline": true, "value": "# from langflow.field_typing import Data\nfrom langflow.custom import Component\nfrom langflow.io import MessageTextInput, Output\nfrom langflow.helpers.data import data_to_text\nfrom langflow.schema import Data\nfrom rerankers import Reranker\n\nclass CustomComponent(Component):\n display_name = \"Reranker\"\n description = \"Use as a template to create your own component.\"\n documentation: str = \"https://pypi.org/project/rerankers/\"\n icon = \"HuggingFace\"\n name = \"Reranker\"\n\n inputs = [\n DataInput(name=\"data\", display_name=\"Data\", info=\"The data to convert to text.\"),\n MultilineInput(name=\"search_query\", display_name=\"Search Query\",required=True),\n StrInput(name=\"model_reranker\", display_name=\"HuggingFace reranker model\",required=True),\n IntInput(\n name=\"number_of_results\",\n display_name=\"Number of Results\",\n info=\"Number of results to return.\",\n value=10,\n required=True,\n ),\n ]\n\n outputs = [\n Output(display_name=\"Output\", name=\"output\", method=\"build_output\"),\n ]\n\n def build_output(self) -> Message:\n docs=[]\n data = self.data if isinstance(self.data, list) else [self.data]\n for doc in data:\n docs.append(data_to_text('{text}',doc,sep=\"\\n\"))\n ranker = Reranker(self.model_reranker, model_type='cross-encoder',verbose=0)\n results = ranker.rank(query=self.search_query, docs=docs, doc_ids=[i for i in range(len(docs))]).top_k(self.number_of_results)\n result_string=\"\"\n for i in range(self.number_of_results):\n result_string+=results[i].text\n result_string+=\"\\n \\n\"\n return Message(text=result_string)", "fileTypes": [], "file_path": "", "password": false, "name": "code", "advanced": true, "dynamic": true, "info": "", "load_from_db": false, "title_case": false }, "model_reranker": { "trace_as_metadata": true, "load_from_db": false, "list": false, "required": true, "placeholder": "", "show": true, "name": "model_reranker", "value": "mixedbread-ai/mxbai-rerank-large-v1", "display_name": "HuggingFace reranker model", "advanced": false, "dynamic": false, "info": "", "title_case": false, "type": "str", "_input_type": "StrInput" }, "number_of_results": { "trace_as_metadata": true, "list": false, "required": true, "placeholder": "", "show": true, "name": "number_of_results", "value": 10, "display_name": "Number of Results", "advanced": false, "dynamic": false, "info": "Number of results to return.", "title_case": false, "type": "int", "_input_type": "IntInput" }, "search_query": { "tool_mode": false, "trace_as_input": true, "multiline": true, "trace_as_metadata": true, "load_from_db": false, "list": false, "required": true, "placeholder": "", "show": true, "name": "search_query", "value": "", "display_name": "Search Query", "advanced": false, "input_types": [ "Message" ], "dynamic": false, "info": "", "title_case": false, "type": "str", "_input_type": "MultilineInput" } }, "description": "Use as a template to create your own component.", "icon": "HuggingFace", "base_classes": [ "Message" ], "display_name": "Reranker", "documentation": "https://pypi.org/project/rerankers/", "custom_fields": {}, "output_types": [], "pinned": false, "conditional_paths": [], "frozen": false, "outputs": [ { "types": [ "Message" ], "selected": "Message", "name": "output", "display_name": "Output", "method": "build_output", "value": "__UNDEFINED__", "cache": true } ], "field_order": [ "data", "search_query", "model_reranker", "number_of_results" ], "beta": false, "legacy": false, "edited": true, "metadata": {}, "tool_mode": false, "official": false }, "type": "Reranker", "id": "CustomComponent-Reranker" }, "id": "CustomComponent-Reranker", "position": { "x": 0, "y": 0 }, "type": "genericNode" } ], "viewport": { "x": 1, "y": 1, "zoom": 1 } }, "description": "Use as a template to create your own component.", "name": "Reranker", "id": "CustomComponent-Reranker", "is_component": true, "last_tested_version": "1.1.0" }

My working pipeline :

Reranker Bug.json

Thank you for your assistance !

Reproduction

  1. Import my custom component Reranker.json in langflow
  2. Establish a connection between the vector store component and the prompt component.
  3. Also link the reranker to the chat input component.

Execute the code multiple times, observing memory usage.

Expected behavior

The memory usage should be stable.

Who can help?

@italojohnny

Operating System

Windows 11

Langflow Version

1.1.4

Python Version

3.11

Screenshot

Image

Flow File

Flow : Flow Reranker Bug.json

Component : Reranker.json

VRivi avatar Feb 18 '25 12:02 VRivi

Hi, there may be solution for that...

Inside custom component use

import gc
from multiprocessing import Process, Manager

here is example method def your_task(shared_dict):

shared_dict - that is how you share values

then after you define your method you want to isolate it with call like that: process = Process(target=your_task, args=(shared_dict,))

do not forget to use some garbage collecting :-)

  try:
      your code
  finally:
      del your_vars
      gc.collect()

severfire avatar Feb 18 '25 15:02 severfire

Hello,

Thanks you for your reply @severfire.

Your solution fixed my issue !

Here is the code in my custom component :

from langflow.field_typing import Data
from langflow.custom import Component
from langflow.io import MessageTextInput, Output
from langflow.helpers.data import data_to_text
from langflow.schema import Data
from rerankers import Reranker
import gc
from multiprocessing import Process, Manager

class CustomComponent(Component):
    display_name = "Reranker"
    description = "Use as a template to create your own component."
    documentation: str = "https://pypi.org/project/rerankers/"
    icon = "HuggingFace"
    name = "Reranker"
    
    inputs = [
        DataInput(name="data", display_name="Data", info="The data to convert to text."),
        MultilineInput(name="search_query", display_name="Search Query", required=True),
        StrInput(name="model_reranker", display_name="HuggingFace reranker model", required=True),
        IntInput(
            name="number_of_results",
            display_name="Number of Results",
            info="Number of results to return.",
            value=10,
            required=True,
        ),
    ]
    
    def build_output(self) -> Message:
        manager = Manager()
        shared_dict = manager.dict()
        
        def rerank():
            docs = []
            data = self.data if isinstance(self.data, list) else [self.data]
            for doc in data:
                docs.append(data_to_text('{text}', doc, sep="\n"))
            ranker = Reranker(self.model_reranker, model_type='cross-encoder', verbose=0)
            results = ranker.rank(query=self.search_query, docs=docs, doc_ids=[i for i in range(len(docs))]).top_k(self.number_of_results)
            result_string = ""
            for i in range(self.number_of_results):
                result_string += results[i].text
                result_string += "\n \n"
            shared_dict['result'] = result_string  # Store the result in the shared dictionary
        
        process = Process(target=rerank)
        process.start()
        process.join()  # Wait for the process to complete
        
        try:
            return Message(text=shared_dict['result'])  # Retrieve and return the result from the shared dictionary
        finally:
            del shared_dict  # Clean up the shared dictionary
            del process
            gc.collect()  # Force garbage collection
    
    outputs = [
        Output(display_name="Output", name="output", method="build_output"),
    ]

Memory usage :

Image

VRivi avatar Feb 19 '25 13:02 VRivi

@ogabrielluiz check this out :-) that's why i think this could solve a lot of possible issues :-)

severfire avatar Feb 19 '25 18:02 severfire

@VRivi I am happy it helped!

severfire avatar Feb 19 '25 18:02 severfire

we are having this problem too

rndtavares avatar May 23 '25 14:05 rndtavares

@ogabrielluiz multiprocessing resolved the issue for @VRivi

severfire avatar May 29 '25 06:05 severfire

The Docling Component also has this error, which was fixed using the above multiprocessing pattern. While the Docling Component will be fixed for v1.6.0 release, general custom components will not inherently have this behavior, so users will need to use the solution(s) as shown above as a workaround in the meantime.

We'll investigate better handling of this for general custom components, but the fix wasn't made universal for v1.6.0 as we haven't yet investigated the performance impact that creating a subprocess for each component will have.

jordanrfrazier avatar Sep 04 '25 23:09 jordanrfrazier

We'll investigate better handling of this for general custom components, but the fix wasn't made universal for v1.6.0 as we haven't yet investigated the performance impact that creating a subprocess for each component will have.

Beyond the performance impact, I believe that the arguments to the subprocess need to be serializable, which may have issues in cases where the component relies on passing an object (eg., Embeddings) or sharing/caching (eg., an API client object that has an internal cache).

bjchambers avatar Sep 11 '25 15:09 bjchambers

We'll investigate better handling of this for general custom components, but the fix wasn't made universal for v1.6.0 as we haven't yet investigated the performance impact that creating a subprocess for each component will have.

Beyond the performance impact, I believe that the arguments to the subprocess need to be serializable, which may have issues in cases where the component relies on passing an object (eg., Embeddings) or sharing/caching (eg., an API client object that has an internal cache).

We could modularize this pattern into a method or a subclass of Component and move components to it, plus document how to use it in Custom Components.

ogabrielluiz avatar Oct 07 '25 20:10 ogabrielluiz

@ogabrielluiz Thank you! Can't wait to test it :-) It could be interesting to have switch 'isolate' on component

severfire avatar Oct 09 '25 21:10 severfire

that is not even the duplicate of https://github.com/langflow-ai/langflow/issues/6645

jobs-git avatar Nov 06 '25 10:11 jobs-git