langflow
langflow copied to clipboard
Memory leak in `InMemoryStateService` due to uncontrolled states growth
The InMemoryStateService exhibits potential memory leakage through:
- Unbounded growth of self.states dictionary
- No expiration/cleanup mechanism for completed runs
- Orphaned observer references
To address the memory leak in InMemoryStateService, you can implement an unsubscribe mechanism for observers after each run is completed. This will help manage the unbounded growth of the self.states dictionary and clean up orphaned observer references. Here's a suggested approach:
-
Implement Unsubscribe Method: Add a method to remove observer callbacks once a run is completed. This will prevent the
self.statesdictionary from growing unboundedly. -
Cleanup After Run Completion: Ensure that the cleanup method is called after each run to remove the observer references, allowing the vertex instances to be garbage collected.
The InMemoryStateService class already includes a unsubscribe method, which can be used to remove observers. You can enhance the complete_run method to utilize this functionality:
class InMemoryStateService(StateService):
def __init__(self, settings_service: SettingsService):
self.settings_service = settings_service
self.states: dict[str, dict] = {}
self.observers: dict[str, list[Callable]] = defaultdict(list)
self.lock = Lock()
# Existing methods...
def complete_run(self, run_id: str):
with self.lock:
# Remove all states associated with the completed run
if run_id in self.states:
del self.states[run_id]
# Unsubscribe all observers for the completed run
for key in list(self.observers.keys()):
self.observers[key] = [obs for obs in self.observers[key] if obs.run_id != run_id]
if not self.observers[key]:
del self.observers[key]
def unsubscribe(self, key, observer: Callable) -> None:
with self.lock:
if observer in self.observers[key]:
self.observers[key].remove(observer)
# Usage
service = InMemoryStateService(settings_service_instance)
service.set_run_id('run1', vertex_instance)
# After run completion
service.complete_run('run1')
This approach ensures that once a run is completed, all associated observer callbacks are removed, preventing memory leaks. Additionally, it cleans up the states dictionary to free up memory [1].
To continue talking to Dosu, mention @dosu.
Help Dosu learn! Give it feedback: Great Response | Irrelevant Answer | Incorrect Sources | Too Verbose | Hallucination | Bug Report | Other
on [1.3.4] I am observing increase of RAM memory usage with each run of the API. I wonder if its also happening on newer versions.
@ogabrielluiz worth checking.
@ogabrielluiz here how it grew after like hmm... I do not know, like more than 10K API calls:
@saber-wang which version are you using?
@severfire 1.3.3
@severfire this issue in 1.4.1 ?
dunno, i think i will be checking next week
Hey all.
I'm looking into this problem this week.
Why do you think InMemoryStateService is related to this?
@ogabrielluiz
Hi, just took some time to revisit this issue. It's because the StateService doesn’t have any interface to clear the state, so anything written into the State could potentially cause a memory leak. Currently, it seems to mainly occur in Graph usage. I see you’ve already created a fix for this👍.
@ogabrielluiz However, after removing these calls in Graph, the NotifyComponent and ListenComponent will no longer be usable. Are there any plans to implement a proper Notify/Listen mechanism in the future?
@ogabrielluiz thank you for fixing it! can't wait for new version!
@ogabrielluiz tested nightly, seems there is still leak somewhere there :-( lost whole day because deadlines and need to restart langflow manually every some time while processing ~ 31k api calls :-/
So there is possibility, that there is also something else going on there :-/