ray
ray copied to clipboard
[Object Store] race condition: Pull Manager will hang in certain timings
What happened + What you expected to happen
- The Object mentioned in the figure is a part of a Request. When Plasma is busy, it is pulled to the local and evicted many times while waiting for the remaining objects of the Request to complete.
- For the Object in the figure, since all copies are deleted, chunk 0 will be lost forever, so it will never be sealed. Since the Object has already been created, the restore will always fail.
Versions / Dependencies
master, commit id: 110cae0ca9e8a4601af7ff9fd41f796117a79620
Reproduction script
The above scenario is very difficult to reproduce, so I simplified it when writing the reproduction code:
- Use
kill raylet
to make a chunk of this Object lost. - Control timing by adding some sleep statements.
test.py:
import numpy as np
from time import sleep
import ray
from ray.cluster_utils import Cluster
# will evict all objects in the plasma
# need disable unlimited.
def evict_all_object(size_MB=None):
if size_MB is None:
# 74MB
size_MB = 74
BIG_OBJECT = np.zeros((size_MB, 1024 * 1024)).astype(np.uint8)
ray.put(BIG_OBJECT)
class Actor:
def __init__(self):
self.datas = []
def send_objects(self, refs):
# Object is NumPy ndarray, so we just need to get the object,
# and don't need to worry about objects being evicted from plasma.
for ref in refs:
self.datas.append(ray.get(ref))
def evict_all_object(self):
self.datas = []
evict_all_object()
cluster = Cluster()
NODE_NUMBER = 2
cluster_node_config = [{
"num_cpus": 10,
"resources": {
f"node{i+1}": 10
},
"object_store_memory": 75 * 1024 * 1024,
} for i in range(NODE_NUMBER)]
cluster_node_config[0]["env_vars"] = {"DRIVER_NODE": "true"}
cluster_node_config[0]["_system_config"] = {
# disable unlimited plasma.
"oom_grace_period_s": 3600,
# only one chunk can be flight.
"object_manager_max_bytes_in_flight": 5 * 1024 * 1024 + 1,
}
worker_node = None
for index, kwargs in enumerate(cluster_node_config):
cluster.add_node(**kwargs)
if index == 0:
ray.init()
worker_node = list(cluster.worker_nodes)[0]
assert len(cluster.worker_nodes) == 1
# 9MB, 2 chunks
OBJECT = np.zeros((9, 1024 * 1024)).astype(np.uint8)
ref = ray.put(OBJECT)
print("Object:", ref)
actor_node2 = ray.remote(Actor).options(
resources={"node2": 1}
).remote()
ray.get(actor_node2.send_objects.remote([ref]))
evict_all_object()
@ray.remote(resources={"node1": 1}, max_retries=0)
def get_object_shape(a):
return a.shape
task_return = get_object_shape.remote(ref)
# sleep to make sure node2 gets the pull request and send the push request
sleep(15)
cluster.remove_node(worker_node)
print("Try to get object shape...")
print("first get OBJECT_A ----------> ", ray.get(task_return))
ray-core modification: https://github.com/ray-project/ray/pull/31389
result:
- will hang on
ray.get(task_return)
- restore will fail infinitely:
proposal:
- abort create the object before calling
restore_spilled_object_
inPullManager
.- pros: easy to implement.
- cons: waste IO.
- In the
ObjectManager
, similar toPushFromFilesystem
, implement aRestoreFromFilesystem
to replace the Restore Spilled Object, so that existing chunks do not need to be read repeatedly, and using the thread pool in the process will be more efficient than the previous Restore process.- pros: more efficient.
- cons: more difficult to implement.
Issue Severity
None
Thanks for the detailed explaination.
For my understanding, it looks like a bug that we still accepting chunks after step 9. Since at this point we should know the object is evicted and we are not actively pulling the object? (does this imply proposal 1 as the solution?)
cc @rkooo567
Thanks for the detailed explaination.
For my understanding, it looks like a bug that we still accepting chunks after step 9. Since at this point we should know the object is evicted and we are not actively pulling the object? (does this imply proposal 1 as the solution?)
cc @rkooo567
In this case, although the object has been evicted, the object may still be in another activating pull request. so, it seems reasonable to accept chunks of the object.
Or maybe we should reject other chunks sent by the same node in the same batch after reject any chunk.
proposal 1 is very simple:
hmm i see. yeah let's implement proposal 1 given how simple it is?