ray icon indicating copy to clipboard operation
ray copied to clipboard

Fix for incorrect channel read behavior after accelerated DAG teardown

Open jackhumphries opened this issue 1 year ago • 6 comments
trafficstars

Why are these changes needed?

Prior to this PR (described in #46284), calling ray.get() on a CompiledDAGRef (i.e., a channel) after DAG teardown would return a large series of zeroes. This issue could be reproduced with this script:

import ray
from ray.dag import InputNode

@ray.remote
class Actor:
    def foo(self, arg):
        return arg
        
a = Actor.remote()
with InputNode() as inp:
    dag = a.foo.bind(inp)
    
dag = dag.experimental_compile()
x = dag.execute(1)
dag.teardown()
# `ray.get(x)` returns a large series of zeroes.
print(ray.get(x))

This issue happened because the channel was unregistered with the mutable object manager on DAG teardown, and thus on a subsequent access to the channel, the core worker thought the channel reference was for a normal immutable Ray object rather than for a channel mutable object. Thus, the core worker was returning the raw underlying memory for the mutable object, and the memory buffers were sized equal to the total size of the underlying memory, not the amount of data in the mutable object.

This PR fixes this issue by properly checking that a channel is either currently registered or previously registered, rather than just checking only that the channel is currently registered.

Related issue number

Closes #46284

Checks

  • [ ] I've signed off every commit(by using the -s flag, i.e., git commit -s) in this PR.
  • [ ] I've run scripts/format.sh to lint the changes in this PR.
  • [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
    • [ ] I've added any new APIs to the API Reference. For example, if I added a method in Tune, I've added it in doc/source/tune/api/ under the corresponding .rst file.
  • [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
  • Testing Strategy
    • [ ] Unit tests
    • [ ] Release tests
    • [ ] This PR is not tested :(

jackhumphries avatar Jun 28 '24 08:06 jackhumphries

This issue happened because the channel was unregistered with the mutable object manager on DAG teardown

Where does unregister happen? Are you referring to SetErrorInternal?

the core worker thought the channel reference was for a normal immutable Ray object rather than for a channel mutable object. Thus, the core worker was returning the raw underlying memory for the mutable object

Are you suggesting that we should use GetExperimentalMutableObjects to read the object for the correct result, but due to the channel being unregistered, GetObjects was used instead?

kevin85421 avatar Jun 28 '24 08:06 kevin85421

This issue happened because the channel was unregistered with the mutable object manager on DAG teardown

Where does unregister happen? Are you referring to SetErrorInternal?

the core worker thought the channel reference was for a normal immutable Ray object rather than for a channel mutable object. Thus, the core worker was returning the raw underlying memory for the mutable object

Are you suggesting that we should use GetExperimentalMutableObjects to read the object for the correct result, but due to the channel being unregistered, GetObjects was used instead?

When I said "unregistered", I meant that SetErrorInternal() was called on the channel, as you said. This method then sets reader_registered to false for the channel.

For the second question, what you said is correct.

jackhumphries avatar Jun 28 '24 09:06 jackhumphries

The PR description makes sense to me, but I have a question:

Based on my observation at #46284 (comment), the actor method can get an error message like (Actor pid=2054169) check_status: Channel closed. False True. This means that:

  • Shared memory channel calls close(), and writes error into has_error successfully. This also implies that the channels are unregistered.
  • However, it can still get the error which means it calls CheckHasError which seems only be called in ReadAcquire. If it is unregistered, it should not call ReadAcquire. There might be some time difference between setting has_error and unregistration. However, the actor always prints the log.

I think what's happening is the actor is calling WriteAcquire() on the same channel, which does not call WriterChannelRegistered() to check if the channel is registered. Currently, CoreWorker::Get() does check ReaderChannelRegistered(), which is why the behavior is different.

After this PR is merged, can you add a check for WriterChannelRegistered()? I think this could be a good way to get more familiar with the C++ codebase. Thanks!

jackhumphries avatar Jun 28 '24 18:06 jackhumphries

After this PR is merged, can you add a check for WriterChannelRegistered()? I think this could be a good way to get more familiar with the C++ codebase. Thanks!

Sounds good. Thanks!

kevin85421 avatar Jun 28 '24 21:06 kevin85421

Should we just not reset reader_registered and writer_registered to false for now? That seems like a simpler fix.

stephanie-wang avatar Jun 28 '24 22:06 stephanie-wang

Should we just not reset reader_registered and writer_registered to false for now? That seems like a simpler fix.

I'd prefer to keep this as is, if that's alright. When I do the channel garbage collection work, I would imagine I'm going to need to implement what is currently in this PR if we don't keep it.

jackhumphries avatar Jun 28 '24 22:06 jackhumphries