[Core] Actor supports job lifetime
Description
Currently Ray actor has two lifetimes:
- detached
- ref counting based
It would be nice to introduce a third mode: job that the lifetime of the actor is tied with the job/driver lifetime.
Use case
No response
We have a use case for this, where certain classes that facilitate running work on a cluster internally use actors for state management (as an implementation detail). It is possible that several of these classes are instantiated as part of the same job, and in this case, we want them to be using the same actor. We have been using named actors for this (not detached), and the classes have been constructing them passing a common name and with the get_if_exists=True flag set so they will be shared.
The change here makes the existing option less viable, as any classes that get an already-existing actor are only holding a weak reference to it, so if the original class that created it goes out of scope, the actor gets killed. Having an actor lifetime tied to the job/driver lifetime would be safer way to implement the original pattern.
The use case sounds a bit narrow when applied in user-facing scenarios, can someone please provide some more use cases to when a job lifetime actor is very valuable?
We also have a use-case for an actor that acts as a job level cache. As soon as the job completes, we would like the actor to be killed as well. We can have two tasks creating a same named actor on the job namespace, however the actor gets killed as soon as the task completes. A repro script below:
import ray
@ray.remote
class Cache:
def __init__(self):
self.cache = {}
def get(self, key):
return self.cache.get(key)
def put(self, key, value):
self.cache[key] = value
ray.init(address='auto', namespace="test")
@ray.remote
def task_put():
print(ray.get_runtime_context().namespace)
actor = Cache.options(name="c", namespace=ray.get_runtime_context().namespace, get_if_exists=True).remote()
print(f"Actor={actor}")
ray.get(actor.put.remote('a', 'we expect this to be returned'))
@ray.remote
def task_get():
print(ray.get_runtime_context().namespace)
actor = Cache.options(name="c", namespace=ray.get_runtime_context().namespace, get_if_exists=True).remote()
print(f"Actor={actor}")
return ray.get(actor.get.remote('a'))
ray.get(task_put.remote())
# (task_put pid=35731) test
# (task_put pid=35731) Actor=Actor(Cache, 6a952335ea63922a4d01a69609000000)
ray.get(task_get.remote())
# (task_get pid=35731) test
# (task_get pid=35731) Actor=Actor(Cache, ea1944b5f41eb5a146b8c1e309000000)