ray icon indicating copy to clipboard operation
ray copied to clipboard

[Core] Memory leakage with remote() calls

Open BhautikDonga opened this issue 1 year ago • 2 comments

What happened + What you expected to happen

I am experiencing memory leakage in my code when I am using remote() calls to get results from different actors. This problem is specific when I am storing remote reference (without awaiting on them) and awaiting on them in different actor.

Versions / Dependencies

I am using ray==2.34.0 and python3.10 with ubuntu20.04.

Reproduction script

import re
import ray
import time
import asyncio
import numpy as np
from ray.util.queue import Queue as rQueue

ray.init(address="auto" if ray.is_initialized() else None)


class Data:
	def __init__(self, src_id, seq_id=None, arr=None):
		self.src_id = src_id
		self.seq_id = seq_id
		self.arr = arr
		self.results = None


@ray.remote
class PacketDecoder:
	def __init__(self, actor_name="packet_decoder") -> None:
		self.actor_name = actor_name
		self.actor_id = int(re.findall("[0-9]+$", actor_name)[-1])
		self._count = 0

	async def run(self, data_holder):
		print(f"{self.actor_name} Packet Decoder started.")
				
		while True:
			try:
				if data_holder.qsize() > 20:
					print("Decoder: Data queue is full .......")
					await asyncio.sleep(1)
					continue

				data_arr = np.random.randint(low=0, high=255, size=(10, 10, 3), dtype=np.uint8)
				data_obj = Data(src_id=self.actor_id, seq_id=self._count, arr=data_arr)
				
				# some blocking task
				time.sleep(0.08)

				# Push data to holders
				await data_holder.put_async(data_obj, timeout=1)
				
				self._count += 1
			except Exception as ex:
				print(f"Error: {self.actor_name}: {ex}")


@ray.remote
class Summation:
	def run(self, arr):
		time.sleep(0.001)
		return np.sum(arr)


@ray.remote
class Power:
	def run(self, arr, p=2):
		time.sleep(0.001)
		return np.power(arr, p)

@ray.remote
class FourierTransform:
	def run(self, arr):
		time.sleep(0.002)
		return np.fft.fft(arr)

@ray.remote
class BasicMath:
	def __init__(self, sum_actor, power_actor):
		self.sum_actor = sum_actor
		self.power_actor = power_actor

	def run(self, arr):
		result = {
			"sum" : self.sum_actor.run.remote(arr),
			"power" : self.power_actor.run.remote(arr),
		}
		return result


@ray.remote
class ODIActor:
	def __init__(self, basic_math_actor, fft_actor, actor_name="odi"):
		self.actor_name = actor_name
		self.basic_math_actor = basic_math_actor
		self.fft_actor = fft_actor

	async def run(self, data_holder, out_holder, src_id, timeout=2):
		while True:
			try:
				try:
					data_obj = await data_holder.get_async(block=True, timeout=timeout)
				except:
					print(f"[{src_id}]: ODI: No frame for last {timeout} seconds.")
					continue

				if out_holder.qsize() > 10:
					print(f"ODI: {src_id}: {seq_id}: Result queue is full .......")
					await asyncio.sleep(0.1)
					continue

				data_arr = data_obj.arr
				seq_id = data_obj.seq_id
				data_obj.results = {
					"basic_results" : self.basic_math_actor.run.remote(data_arr),
					"fft_result": self.fft_actor.run.remote(data_arr),
				}

				# Push data to out holder
				await out_holder.put_async(data_obj, timeout=1)

			except Exception as ex:
				print(f"ODI: [{src_id}]: Error: {ex}.")


@ray.remote
class Aggregator:
	def __init__(self, actor_name="aggregator"):
		self.actor_name = actor_name

	async def run(self, result_holder, timeout=2):
		while True:
			try:
				try:
					data_obj = await result_holder.get_async(block=True, timeout=timeout)
				except:
					print(f"Aggregator: No frame for last {timeout} seconds.")
					continue

				data_arr = data_obj.arr
				src_id = data_obj.src_id
				seq_id = data_obj.seq_id
				_results = data_obj.results
				
				_basic_operations = await _results["basic_results"]
				_sum, _power, _fft = await asyncio.gather(
										_basic_operations["sum"],
										_basic_operations["power"],
										_results["fft_result"]
									)

				# print(f"Result: {src_id}: {seq_id}: Data:{data_arr}, Sum:{_sum}, Power:{_power}, FFT:{_fft}")
			except Exception as ex:
				print(f"Aggregator: Error: {ex}.")
			

def main():
	
	# initialise decoding actors
	src_prefix = "src_"
	decode_actors = {}
	for src in range(7):
		decoder_name = f"decoder_{src}"
		decode_actors[f"{src_prefix}{src}"] = PacketDecoder.options(name=decoder_name, namespace="test", 
													num_cpus=0.1).remote(decoder_name)

	# initialise odi actor
	sum_actor = Summation.options(name="sum", namespace="test", num_cpus=0.2).remote()
	power_actor = Power.options(name="power", namespace="test", num_cpus=0.2).remote()
	math_actor = BasicMath.options(name="maths", namespace="test", num_cpus=0.2).remote(sum_actor, power_actor)
	fft_actor = FourierTransform.options(name="fft", namespace="test", num_cpus=0.2).remote()
	odi = ODIActor.options(name="odi", namespace="test", num_cpus=0.2).remote(math_actor, fft_actor, "odi")
	
	# initialise aggregator
	aggregator = Aggregator.options(name="aggregator", namespace="test", num_cpus=0.2).remote("aggregator")
	
	# create frame holders for all decoding actors
	data_holders = {
		src_name : rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"{src_name}_dh"})
		for src_name in decode_actors
	}

	# create result holder for odi actor
	result_holder = rQueue(actor_options={"num_cpus":0.01, "namespace":"test", "name":f"result_h"})
	
	# start Aggregator actors first
	aggregator.run.remote(result_holder)

	# start ODI actor for each source in async
	for f_name in data_holders:
		odi.run.remote(data_holders[f_name], result_holder, f_name)

	# start the decoding actors
	decoder_refs = []
	for d_name in decode_actors:
		decoder_refs.append(decode_actors[d_name].run.remote(data_holders[d_name]))

	ray.get(decoder_refs)

	print("------------------------- END -------------------------")


if __name__ == "__main__":
	try:
		main()
	except Exception as ex:
		print(f"Error: {ex}")

Issue Severity

High: It blocks me from completing my task.

BhautikDonga avatar Aug 22 '24 09:08 BhautikDonga

Please find the related graphs and extra information from this discussion: https://discuss.ray.io/t/memory-leakage-with-remote-calls/15541

BhautikDonga avatar Aug 22 '24 09:08 BhautikDonga

I experience this issue for a long time, The key is disable nest remote call, that is do not nest remote at here

@ray.remote
class BasicMath:
	def __init__(self, sum_actor, power_actor):
		self.sum_actor = sum_actor
		self.power_actor = power_actor

	def run(self, arr):
		result = {
			"sum" : self.sum_actor.run.remote(arr),
			"power" : self.power_actor.run.remote(arr),
		}
		return result

you can check some old issues. I have proposed this and it's become a anti-patten now.https://github.com/ray-project/ray/issues/43624 https://github.com/ray-project/ray/issues/43892

Bye-legumes avatar Aug 22 '24 15:08 Bye-legumes

We are facing a similar issue. Please suggest if anyone have a quick workaround (awaiting on the remote calls is not a option for us). Thank you.

KrishnaKishore123 avatar Sep 06 '24 11:09 KrishnaKishore123

@anyscalesam @rkooo567 Any update on this issue? Can you able to reproduce the issue at your end ?

BhautikDonga avatar Sep 09 '24 05:09 BhautikDonga

hey @BhautikDonga we haven't had a chance to try this yet; please stay tuned. We're a little swamped prepping ahead of Ray Summit at the end of this month.

anyscalesam avatar Sep 09 '24 22:09 anyscalesam

@anyscalesam @Bye-legumes Here, we have noticed increment of WorkerHeap memory also. Can you suggest what could be the reasons behind that ? It would be helpful if you can also clarify what are things in example should occupy worker heap memory.

BhautikDonga avatar Sep 13 '24 06:09 BhautikDonga

@BhautikDonga can you provide me 2 information?

  1. when you say "memory leakeage", what memory are you referring to? Ray object store memory or heap memory? Can you share any sceenshot that shows the memory usage growing over time?
  2. If it is RAM usage growing up, can you try https://docs.ray.io/en/master/ray-observability/user-guides/debug-apps/debug-memory.html#memory-profiling-ray-tasks-and-actors and see what causes the leak?

This problem is specific when I am storing remote reference (without awaiting on them) and awaiting on them in different actor.

Also can you tell me more specifically which reference are you referring to?

Lastly, I think this may not be releated to any "leakage", but it could be related if you have burst of memory usage.

@ray.remote
class BasicMath:
	def __init__(self, sum_actor, power_actor):
		self.sum_actor = sum_actor
		self.power_actor = power_actor

	def run(self, arr):
		result = {
			"sum" : self.sum_actor.run.remote(arr),
			"power" : self.power_actor.run.remote(arr),
		}
		return result

For this part, arr is data, not the object reference, which means you are serializing/deserializing arr, and you are expected to have higher memory usage here. Since ray supports zero copy deserialization for numpy, it may not impact e2e performance though.

Better approach is to store object reference here;

class PacketDecoder:
	def __init__(self, actor_name="packet_decoder") -> None:
		self.actor_name = actor_name
		self.actor_id = int(re.findall("[0-9]+$", actor_name)[-1])
		self._count = 0

	async def run(self, data_holder):
		print(f"{self.actor_name} Packet Decoder started.")
				
		while True:
			try:
				if data_holder.qsize() > 20:
					print("Decoder: Data queue is full .......")
					await asyncio.sleep(1)
					continue
 
                                 # here!! you do ray.put so that you stores the reference not data
				data_arr = ray.put(np.random.randint(low=0, high=255, size=(10, 10, 3), dtype=np.uint8))
				data_obj = Data(src_id=self.actor_id, seq_id=self._count, arr=data_arr)
				
				# some blocking task
				time.sleep(0.08)

				# Push data to holders
				await data_holder.put_async(data_obj, timeout=1)
				
				self._count += 1
			except Exception as ex:
				print(f"Error: {self.actor_name}: {ex}")

and then later ray.get when you need the data

@ray.remote
class FourierTransform:
	def run(self, arr_ref):
                arr = ray.get(arr_ref)
		time.sleep(0.002)
		return np.fft.fft(arr)

Relevant doc: https://docs.ray.io/en/master/ray-core/patterns/pass-large-arg-by-value.html

rkooo567 avatar Sep 14 '24 14:09 rkooo567

@rkooo567 Thanks for your thorough analysis.

  1. when you say "memory leakeage", what memory are you referring to? Ray object store memory or heap memory? Can you share any sceenshot that shows the memory usage growing over time?

Here "memory leakage" refers to RAM usage. Please find the graphs showing memory leakage at following thread: https://discuss.ray.io/t/memory-leakage-with-remote-calls/15541

2. If it is RAM usage growing up, can you try https://docs.ray.io/en/master/ray-observability/user-guides/debug-apps/debug-memory.html#memory-profiling-ray-tasks-and-actors and see what causes the leak?

I have analysed objects using ray memory and have tried to find leakage with the use of memray tool but could not find any concrete cause. Please help me here to analyse thoroughly if you have any specific way / method.


Also can you tell me more specifically which reference are you referring to?

Here, I am referring to method call remote references. For example, In BasicMath class in script, at following place we are storing remote call references without awaiting. That's where nested serialisation coming into picture, where, result at following place, contains some ray object refs and it will be serialised again as complete object and returned at respective call.

		result = {
			"sum" : self.sum_actor.run.remote(arr),
			"power" : self.power_actor.run.remote(arr),
		}
                return result

If we await there and store return values directly, then there is no memory leakage. For example, following code would resolve memory leakage:

		result = {
			"sum" : ray.get(self.sum_actor.run.remote(arr)),
			"power" : ray.get(self.power_actor.run.remote(arr)),
		}
                return result

Better approach is to store object reference here;

This has been tried and it has shown memory leakage. Started another issue for that: https://github.com/ray-project/ray/issues/47273, you can find related leakage graphs at following thread: https://discuss.ray.io/t/memory-leakage-with-ray-put/15501. I guess, there also, when we pass object reference via queue, it should get serialised again. And as we are seeing memory leakage with nested serialisation, it also showing memory leakage. But this is still just a guess, you can provide further details with this.


Please let me know, if I can provide any further details for your analysis.

BhautikDonga avatar Sep 16 '24 06:09 BhautikDonga

@rkooo567 @Bye-legumes I have tried storing binary id of remote refs and creating ray.ObjectRef wherever required as follows:

		result = {
			"sum" : self.sum_actor.run.remote(arr).binary(),
			"power" : self.power_actor.run.remote(arr).binary(),
		}
                return result
sum_value = ray.get( ray.ObjectRef(result["sum"]) )
power_value = ray.get( ray.ObjectRef(result["power"]) )

But, with this, It can't able to get value from created ObjectRef and ray.get is blocking indefinitely. Is it expected behaviour ? Can I try any another method to avoid serialisation of ObjectRef ?

BhautikDonga avatar Sep 25 '24 05:09 BhautikDonga

@rkooo567 @anyscalesam Is there any update on this ? Can you able to reproduce the issue at your end ?

BhautikDonga avatar Oct 03 '24 05:10 BhautikDonga

@kevin85421 Any update on this issue ? Please let me know if you need any other information.

BhautikDonga avatar Nov 08 '24 04:11 BhautikDonga

Hi, @BhautikDonga the recommand way is to disable nest remote call and nest ref as args...The lineage recontructon will keep all data in the object store and do not delete it.

Bye-legumes avatar Nov 08 '24 04:11 Bye-legumes

@Bye-legumes Thanks for your suggestion. Yes, as per your suggestion, that's what we are doing, but as it is blocking call (we have to wait for value as we can't pass ref), It is not supporting our expected distributed architecture. To move towards the better system design, this issue is one of the blocking factor for us.

BhautikDonga avatar Nov 08 '24 05:11 BhautikDonga

@kevin85421 @jjyao Any update on this issue ? Please let me know if you need any other information.

BhautikDonga avatar Dec 12 '24 09:12 BhautikDonga

mark👀

rbao2018 avatar May 07 '25 09:05 rbao2018