ray
ray copied to clipboard
Fix for incorrect channel read behavior after accelerated DAG teardown
Why are these changes needed?
Prior to this PR (described in #46284), calling ray.get() on a CompiledDAGRef (i.e., a channel) after DAG teardown would return a large series of zeroes. This issue could be reproduced with this script:
import ray
from ray.dag import InputNode
@ray.remote
class Actor:
def foo(self, arg):
return arg
a = Actor.remote()
with InputNode() as inp:
dag = a.foo.bind(inp)
dag = dag.experimental_compile()
x = dag.execute(1)
dag.teardown()
# `ray.get(x)` returns a large series of zeroes.
print(ray.get(x))
This issue happened because the channel was unregistered with the mutable object manager on DAG teardown, and thus on a subsequent access to the channel, the core worker thought the channel reference was for a normal immutable Ray object rather than for a channel mutable object. Thus, the core worker was returning the raw underlying memory for the mutable object, and the memory buffers were sized equal to the total size of the underlying memory, not the amount of data in the mutable object.
This PR fixes this issue by properly checking that a channel is either currently registered or previously registered, rather than just checking only that the channel is currently registered.
Related issue number
Closes #46284
Checks
- [ ] I've signed off every commit(by using the -s flag, i.e.,
git commit -s) in this PR. - [ ] I've run
scripts/format.shto lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
doc/source/tune/api/under the corresponding.rstfile.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [ ] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(
This issue happened because the channel was unregistered with the mutable object manager on DAG teardown
Where does unregister happen? Are you referring to SetErrorInternal?
the core worker thought the channel reference was for a normal immutable Ray object rather than for a channel mutable object. Thus, the core worker was returning the raw underlying memory for the mutable object
Are you suggesting that we should use GetExperimentalMutableObjects to read the object for the correct result, but due to the channel being unregistered, GetObjects was used instead?
This issue happened because the channel was unregistered with the mutable object manager on DAG teardown
Where does
unregisterhappen? Are you referring to SetErrorInternal?the core worker thought the channel reference was for a normal immutable Ray object rather than for a channel mutable object. Thus, the core worker was returning the raw underlying memory for the mutable object
Are you suggesting that we should use
GetExperimentalMutableObjectsto read the object for the correct result, but due to the channel being unregistered,GetObjectswas used instead?
When I said "unregistered", I meant that SetErrorInternal() was called on the channel, as you said. This method then sets reader_registered to false for the channel.
For the second question, what you said is correct.
The PR description makes sense to me, but I have a question:
Based on my observation at #46284 (comment), the actor method can get an error message like
(Actor pid=2054169) check_status: Channel closed. False True. This means that:
- Shared memory channel calls
close(), and writes error intohas_errorsuccessfully. This also implies that the channels are unregistered.- However, it can still get the error which means it calls
CheckHasErrorwhich seems only be called inReadAcquire. If it is unregistered, it should not callReadAcquire. There might be some time difference between settinghas_errorand unregistration. However, the actor always prints the log.
I think what's happening is the actor is calling WriteAcquire() on the same channel, which does not call WriterChannelRegistered() to check if the channel is registered. Currently, CoreWorker::Get() does check ReaderChannelRegistered(), which is why the behavior is different.
After this PR is merged, can you add a check for WriterChannelRegistered()? I think this could be a good way to get more familiar with the C++ codebase. Thanks!
After this PR is merged, can you add a check for WriterChannelRegistered()? I think this could be a good way to get more familiar with the C++ codebase. Thanks!
Sounds good. Thanks!
Should we just not reset reader_registered and writer_registered to false for now? That seems like a simpler fix.
Should we just not reset reader_registered and writer_registered to false for now? That seems like a simpler fix.
I'd prefer to keep this as is, if that's alright. When I do the channel garbage collection work, I would imagine I'm going to need to implement what is currently in this PR if we don't keep it.