ray icon indicating copy to clipboard operation
ray copied to clipboard

[Core] Actor supports job lifetime

Open jjyao opened this issue 1 year ago • 2 comments

Description

Currently Ray actor has two lifetimes:

  1. detached
  2. ref counting based

It would be nice to introduce a third mode: job that the lifetime of the actor is tied with the job/driver lifetime.

Use case

No response

jjyao avatar Jun 28 '24 20:06 jjyao

We have a use case for this, where certain classes that facilitate running work on a cluster internally use actors for state management (as an implementation detail). It is possible that several of these classes are instantiated as part of the same job, and in this case, we want them to be using the same actor. We have been using named actors for this (not detached), and the classes have been constructing them passing a common name and with the get_if_exists=True flag set so they will be shared.

The change here makes the existing option less viable, as any classes that get an already-existing actor are only holding a weak reference to it, so if the original class that created it goes out of scope, the actor gets killed. Having an actor lifetime tied to the job/driver lifetime would be safer way to implement the original pattern.

ptomecek avatar Jun 28 '24 21:06 ptomecek

The use case sounds a bit narrow when applied in user-facing scenarios, can someone please provide some more use cases to when a job lifetime actor is very valuable?

Superskyyy avatar Jun 29 '24 02:06 Superskyyy

We also have a use-case for an actor that acts as a job level cache. As soon as the job completes, we would like the actor to be killed as well. We can have two tasks creating a same named actor on the job namespace, however the actor gets killed as soon as the task completes. A repro script below:

import ray

@ray.remote
class Cache:
	def __init__(self):
		self.cache = {}
	def get(self, key):
		return self.cache.get(key)
	def put(self, key, value):
		self.cache[key] = value

ray.init(address='auto', namespace="test")

@ray.remote
def task_put():
	print(ray.get_runtime_context().namespace)
	actor = Cache.options(name="c", namespace=ray.get_runtime_context().namespace, get_if_exists=True).remote()
	print(f"Actor={actor}")
	ray.get(actor.put.remote('a', 'we expect this to be returned'))

@ray.remote
def task_get():
	print(ray.get_runtime_context().namespace)
	actor = Cache.options(name="c", namespace=ray.get_runtime_context().namespace, get_if_exists=True).remote()
	print(f"Actor={actor}")
	return ray.get(actor.get.remote('a'))

ray.get(task_put.remote())
# (task_put pid=35731) test
# (task_put pid=35731) Actor=Actor(Cache, 6a952335ea63922a4d01a69609000000)
ray.get(task_get.remote())
# (task_get pid=35731) test
# (task_get pid=35731) Actor=Actor(Cache, ea1944b5f41eb5a146b8c1e309000000)

raghumdani avatar Sep 28 '24 23:09 raghumdani