[Core] Memory leakage with remote() calls
What happened + What you expected to happen
I am experiencing memory leakage in my code when I am using remote() calls to get results from different actors. This problem is specific when I am storing remote reference (without awaiting on them) and awaiting on them in different actor.
Versions / Dependencies
I am using ray==2.34.0 and python3.10 with ubuntu20.04.
Reproduction script
import re
import ray
import time
import asyncio
import numpy as np
from ray.util.queue import Queue as rQueue
ray.init(address="auto" if ray.is_initialized() else None)
class Data:
def __init__(self, src_id, seq_id=None, arr=None):
self.src_id = src_id
self.seq_id = seq_id
self.arr = arr
self.results = None
@ray.remote
class PacketDecoder:
def __init__(self, actor_name="packet_decoder") -> None:
self.actor_name = actor_name
self.actor_id = int(re.findall("[0-9]+$", actor_name)[-1])
self._count = 0
async def run(self, data_holder):
print(f"{self.actor_name} Packet Decoder started.")
while True:
try:
if data_holder.qsize() > 20:
print("Decoder: Data queue is full .......")
await asyncio.sleep(1)
continue
data_arr = np.random.randint(low=0, high=255, size=(10, 10, 3), dtype=np.uint8)
data_obj = Data(src_id=self.actor_id, seq_id=self._count, arr=data_arr)
# some blocking task
time.sleep(0.08)
# Push data to holders
await data_holder.put_async(data_obj, timeout=1)
self._count += 1
except Exception as ex:
print(f"Error: {self.actor_name}: {ex}")
@ray.remote
class Summation:
def run(self, arr):
time.sleep(0.001)
return np.sum(arr)
@ray.remote
class Power:
def run(self, arr, p=2):
time.sleep(0.001)
return np.power(arr, p)
@ray.remote
class FourierTransform:
def run(self, arr):
time.sleep(0.002)
return np.fft.fft(arr)
@ray.remote
class BasicMath:
def __init__(self, sum_actor, power_actor):
self.sum_actor = sum_actor
self.power_actor = power_actor
def run(self, arr):
result = {
"sum" : self.sum_actor.run.remote(arr),
"power" : self.power_actor.run.remote(arr),
}
return result
@ray.remote
class ODIActor:
def __init__(self, basic_math_actor, fft_actor, actor_name="odi"):
self.actor_name = actor_name
self.basic_math_actor = basic_math_actor
self.fft_actor = fft_actor
async def run(self, data_holder, out_holder, src_id, timeout=2):
while True:
try:
try:
data_obj = await data_holder.get_async(block=True, timeout=timeout)
except:
print(f"[{src_id}]: ODI: No frame for last {timeout} seconds.")
continue
if out_holder.qsize() > 10:
print(f"ODI: {src_id}: {seq_id}: Result queue is full .......")
await asyncio.sleep(0.1)
continue
data_arr = data_obj.arr
seq_id = data_obj.seq_id
data_obj.results = {
"basic_results" : self.basic_math_actor.run.remote(data_arr),
"fft_result": self.fft_actor.run.remote(data_arr),
}
# Push data to out holder
await out_holder.put_async(data_obj, timeout=1)
except Exception as ex:
print(f"ODI: [{src_id}]: Error: {ex}.")
@ray.remote
class Aggregator:
def __init__(self, actor_name="aggregator"):
self.actor_name = actor_name
async def run(self, result_holder, timeout=2):
while True:
try:
try:
data_obj = await result_holder.get_async(block=True, timeout=timeout)
except:
print(f"Aggregator: No frame for last {timeout} seconds.")
continue
data_arr = data_obj.arr
src_id = data_obj.src_id
seq_id = data_obj.seq_id
_results = data_obj.results
_basic_operations = await _results["basic_results"]
_sum, _power, _fft = await asyncio.gather(
_basic_operations["sum"],
_basic_operations["power"],
_results["fft_result"]
)
# print(f"Result: {src_id}: {seq_id}: Data:{data_arr}, Sum:{_sum}, Power:{_power}, FFT:{_fft}")
except Exception as ex:
print(f"Aggregator: Error: {ex}.")
def main():
# initialise decoding actors
src_prefix = "src_"
decode_actors = {}
for src in range(7):
decoder_name = f"decoder_{src}"
decode_actors[f"{src_prefix}{src}"] = PacketDecoder.options(name=decoder_name, namespace="test",
num_cpus=0.1).remote(decoder_name)
# initialise odi actor
sum_actor = Summation.options(name="sum", namespace="test", num_cpus=0.2).remote()
power_actor = Power.options(name="power", namespace="test", num_cpus=0.2).remote()
math_actor = BasicMath.options(name="maths", namespace="test", num_cpus=0.2).remote(sum_actor, power_actor)
fft_actor = FourierTransform.options(name="fft", namespace="test", num_cpus=0.2).remote()
odi = ODIActor.options(name="odi", namespace="test", num_cpus=0.2).remote(math_actor, fft_actor, "odi")
# initialise aggregator
aggregator = Aggregator.options(name="aggregator", namespace="test", num_cpus=0.2).remote("aggregator")
# create frame holders for all decoding actors
data_holders = {
src_name : rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"{src_name}_dh"})
for src_name in decode_actors
}
# create result holder for odi actor
result_holder = rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"result_h"})
# start Aggregator actors first
aggregator.run.remote(result_holder)
# start ODI actor for each source in async
for f_name in data_holders:
odi.run.remote(data_holders[f_name], result_holder, f_name)
# start the decoding actors
decoder_refs = []
for d_name in decode_actors:
decoder_refs.append(decode_actors[d_name].run.remote(data_holders[d_name]))
ray.get(decoder_refs)
print("------------------------- END -------------------------")
if __name__ == "__main__":
try:
main()
except Exception as ex:
print(f"Error: {ex}")
Issue Severity
High: It blocks me from completing my task.
Please find the related graphs and extra information from this discussion: https://discuss.ray.io/t/memory-leakage-with-remote-calls/15541
I experience this issue for a long time, The key is disable nest remote call, that is do not nest remote at here
@ray.remote
class BasicMath:
def __init__(self, sum_actor, power_actor):
self.sum_actor = sum_actor
self.power_actor = power_actor
def run(self, arr):
result = {
"sum" : self.sum_actor.run.remote(arr),
"power" : self.power_actor.run.remote(arr),
}
return result
you can check some old issues. I have proposed this and it's become a anti-patten now.https://github.com/ray-project/ray/issues/43624 https://github.com/ray-project/ray/issues/43892
We are facing a similar issue. Please suggest if anyone have a quick workaround (awaiting on the remote calls is not a option for us). Thank you.
@anyscalesam @rkooo567 Any update on this issue? Can you able to reproduce the issue at your end ?
hey @BhautikDonga we haven't had a chance to try this yet; please stay tuned. We're a little swamped prepping ahead of Ray Summit at the end of this month.
@anyscalesam @Bye-legumes Here, we have noticed increment of WorkerHeap memory also. Can you suggest what could be the reasons behind that ? It would be helpful if you can also clarify what are things in example should occupy worker heap memory.
@BhautikDonga can you provide me 2 information?
- when you say "memory leakeage", what memory are you referring to? Ray object store memory or heap memory? Can you share any sceenshot that shows the memory usage growing over time?
- If it is RAM usage growing up, can you try https://docs.ray.io/en/master/ray-observability/user-guides/debug-apps/debug-memory.html#memory-profiling-ray-tasks-and-actors and see what causes the leak?
This problem is specific when I am storing remote reference (without awaiting on them) and awaiting on them in different actor.
Also can you tell me more specifically which reference are you referring to?
Lastly, I think this may not be releated to any "leakage", but it could be related if you have burst of memory usage.
@ray.remote
class BasicMath:
def __init__(self, sum_actor, power_actor):
self.sum_actor = sum_actor
self.power_actor = power_actor
def run(self, arr):
result = {
"sum" : self.sum_actor.run.remote(arr),
"power" : self.power_actor.run.remote(arr),
}
return result
For this part, arr is data, not the object reference, which means you are serializing/deserializing arr, and you are expected to have higher memory usage here. Since ray supports zero copy deserialization for numpy, it may not impact e2e performance though.
Better approach is to store object reference here;
class PacketDecoder:
def __init__(self, actor_name="packet_decoder") -> None:
self.actor_name = actor_name
self.actor_id = int(re.findall("[0-9]+$", actor_name)[-1])
self._count = 0
async def run(self, data_holder):
print(f"{self.actor_name} Packet Decoder started.")
while True:
try:
if data_holder.qsize() > 20:
print("Decoder: Data queue is full .......")
await asyncio.sleep(1)
continue
# here!! you do ray.put so that you stores the reference not data
data_arr = ray.put(np.random.randint(low=0, high=255, size=(10, 10, 3), dtype=np.uint8))
data_obj = Data(src_id=self.actor_id, seq_id=self._count, arr=data_arr)
# some blocking task
time.sleep(0.08)
# Push data to holders
await data_holder.put_async(data_obj, timeout=1)
self._count += 1
except Exception as ex:
print(f"Error: {self.actor_name}: {ex}")
and then later ray.get when you need the data
@ray.remote
class FourierTransform:
def run(self, arr_ref):
arr = ray.get(arr_ref)
time.sleep(0.002)
return np.fft.fft(arr)
Relevant doc: https://docs.ray.io/en/master/ray-core/patterns/pass-large-arg-by-value.html
@rkooo567 Thanks for your thorough analysis.
- when you say "memory leakeage", what memory are you referring to? Ray object store memory or heap memory? Can you share any sceenshot that shows the memory usage growing over time?
Here "memory leakage" refers to RAM usage. Please find the graphs showing memory leakage at following thread: https://discuss.ray.io/t/memory-leakage-with-remote-calls/15541
2. If it is RAM usage growing up, can you try https://docs.ray.io/en/master/ray-observability/user-guides/debug-apps/debug-memory.html#memory-profiling-ray-tasks-and-actors and see what causes the leak?
I have analysed objects using ray memory and have tried to find leakage with the use of memray tool but could not find any concrete cause. Please help me here to analyse thoroughly if you have any specific way / method.
Also can you tell me more specifically which reference are you referring to?
Here, I am referring to method call remote references. For example, In BasicMath class in script, at following place we are storing remote call references without awaiting. That's where nested serialisation coming into picture, where, result at following place, contains some ray object refs and it will be serialised again as complete object and returned at respective call.
result = {
"sum" : self.sum_actor.run.remote(arr),
"power" : self.power_actor.run.remote(arr),
}
return result
If we await there and store return values directly, then there is no memory leakage. For example, following code would resolve memory leakage:
result = {
"sum" : ray.get(self.sum_actor.run.remote(arr)),
"power" : ray.get(self.power_actor.run.remote(arr)),
}
return result
Better approach is to store object reference here;
This has been tried and it has shown memory leakage. Started another issue for that: https://github.com/ray-project/ray/issues/47273, you can find related leakage graphs at following thread: https://discuss.ray.io/t/memory-leakage-with-ray-put/15501. I guess, there also, when we pass object reference via queue, it should get serialised again. And as we are seeing memory leakage with nested serialisation, it also showing memory leakage. But this is still just a guess, you can provide further details with this.
Please let me know, if I can provide any further details for your analysis.
@rkooo567 @Bye-legumes I have tried storing binary id of remote refs and creating ray.ObjectRef wherever required as follows:
result = {
"sum" : self.sum_actor.run.remote(arr).binary(),
"power" : self.power_actor.run.remote(arr).binary(),
}
return result
sum_value = ray.get( ray.ObjectRef(result["sum"]) )
power_value = ray.get( ray.ObjectRef(result["power"]) )
But, with this, It can't able to get value from created ObjectRef and ray.get is blocking indefinitely. Is it expected behaviour ? Can I try any another method to avoid serialisation of ObjectRef ?
@rkooo567 @anyscalesam Is there any update on this ? Can you able to reproduce the issue at your end ?
@kevin85421 Any update on this issue ? Please let me know if you need any other information.
Hi, @BhautikDonga the recommand way is to disable nest remote call and nest ref as args...The lineage recontructon will keep all data in the object store and do not delete it.
@Bye-legumes Thanks for your suggestion. Yes, as per your suggestion, that's what we are doing, but as it is blocking call (we have to wait for value as we can't pass ref), It is not supporting our expected distributed architecture. To move towards the better system design, this issue is one of the blocking factor for us.
@kevin85421 @jjyao Any update on this issue ? Please let me know if you need any other information.
mark👀