ray
ray copied to clipboard
Use detached lifetime for stats actor
Why are these changes needed?
The actor handle held at Ray client will become dangling if the Ray cluster is shutdown, and in such case if the user tries to get the actor again it will result in crash. This happened in a real user and blocked them from making progress.
This change makes the stats actor detached, and instead of keeping a handle, we access it via its name. This way we can make sure re-create this actor if the cluster gets restarted.
Related issue number
Closes #25237
Checks
- [ ] I've run
scripts/format.shto lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [ ] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(
@ericl @jianoaix So we can assume that the get_if_exists=True path is ~as cheap as the global actor handle cache since the core worker should cache actor handle fetches, right? It looks as if that path should only involve an RPC on the first fetch for a given worker process.
Hmm that's a good question, we should validate this for sure prior to merging.
Compared to the data loading of read task, one RPC seems a small cost?
Do we have a test to run the impact of this?
Simple enough to run a trivial dataset workload with small blocks before/after this PR. Maybe like 10000 blocks range + map batches?
Tried a simple test like this:
total_time = 0
for _ in range(16):
start_time = time.time()
ds = ray.data.range(100000, parallelism=10000)
ds.map_batches(lambda x: x)
total_time = time.time() - start_time
print("mean time:", total_time / 16)
On a local cluster with 8 nodes and 1 cpu/node:
def build_cluster(num_nodes, num_cpus):
cluster = Cluster()
for _ in range(num_nodes):
cluster.add_node(num_cpus=num_cpus)
cluster.wait_for_nodes()
return cluster
cluster = build_cluster(8, 1)
With this PR: mean time: 2.742537647485733 Without this PR: mean time: 2.706667184829712 The difference is 1.33%, which is small given the block is very small (just 10 ints). But the nodes are all on laptop so the RPC might be cheaper than real cluster.
Also, small nit - this docstring comment on the actor scope is not accurate anymore: https://github.com/ray-project/ray/blob/3c9bd6648555c142fd70de6b2c0808f887f08fd5/python/ray/data/impl/stats.py#L85
Microbenchmark:
start_time = time.time()
for _ in range(1000):
ah = ray.data.impl.stats._get_or_create_stats_actor()
print("mean time to get:", (time.time() - start_time) / 1000)
Before: 1.4783143997192383e-05 (sec) After: 0.0005355322360992432 (sec) Diff: 36x increase
Some broken tests. Are there any blocking concerns to revisit the design? The microbench result seems expected / fine.
No blocking concerns. Yeah the absolute scale of difference is small enough to affect overall performance. The failing tests were on https://flakey-tests.ray.io/, I'm going to re-run them since they turned green in recent runs at flaky dashboard.
This pull request has been automatically marked as stale because it has not had recent activity. It will be closed in 14 days if no further activity occurs. Thank you for your contributions.
- If you'd like to keep this open, just leave any comment, and the stale label will be removed.
@jianoaix what's the status of this?
This was to fix an issue from a main user. It was a blocker for them but then they were satisfied with a hack, so this got deprioritized. This user is now planning to upgrade to 1.13, so this will be needed to remove that hack. I'll revive this PR.
Slightly updated to make the call to get actor once per Dataset, rather than once per task.
I was looking at the test failures, but it turned out they were already on the flaky test list. So the PR is ready to review/merge.
@ericl @clarkzinzow ptal, thanks
Not sure it'll work, DatasetStats as a Python/local object can have multiple instances in cluster, this cannot clean up the entry in actor for each destruction.
@jianoaix Ah good point, yeah I forgot that stats can be sent around to other tasks. It doesn't seem like there's a good garbage collection point at the moment. 🤔
Do we need cherry pick this to 2.0.0 release branch?
Do we need cherry pick this to 2.0.0 release branch?
We haven't heard any issues other than the user who originally reported this (holding a ray client, and shutdown/restarting cluster for multiple trials), so we probably do not need to pick it.
Synced to head and CI passed, @clarkzinzow
What if we just have a FIFO queue of stats? Like the most recent 10000 Dataset stats, which should suffice for almost everyone, but safeguard against any worst case OOMs like if you're running datasets in a while loop or something.
On Fri, Aug 5, 2022, 10:51 AM Clark Zinzow @.***> wrote:
@jianoaix https://github.com/jianoaix Ah good point, yeah I forgot that stats can be sent around to other tasks. It doesn't seem like there's a good garbage collection point at the moment. 🤔
— Reply to this email directly, view it on GitHub https://github.com/ray-project/ray/pull/25271#issuecomment-1206702727, or unsubscribe https://github.com/notifications/unsubscribe-auth/AAADUSQLAWC5PBZJXM54IWDVXVIAJANCNFSM5XFZX7YQ . You are receiving this because you were mentioned.Message ID: @.***>