ray
ray copied to clipboard
[ADAG] Support tasks with multiple return values in aDAG
Why are these changes needed?
aDAG currently does not support multiple return values. We would like to add general support for multiple return values.
This PR supports multiple returns by returning a separate ClassMethodNode for each return value of the tuple. It is an incremental change for ClassMethodNode, addign _is_class_method_output, _class_method_call, _output_idx. _output_idx is used to guide channel allocation and output writes. User needs to specify num_returns > 1 to hint multiple return values. The upstream task allocates a separate output channel for each return value. A downstream task reads from one of the output channels.
What is done?
We modify ClassMethodNode to handle two logics, one is a class method call which is the original semantics (self.is_class_method_call == True), another is a class method output which is responsible for one of the multiple return values (self.is_class_method_output == True).
We modify WriterInterface to support writes to multiple output_channels with output_idxs. If an output index is None, it means the complete return value is written to the output channel. Otherwise, the return value is a tuple and the index is used to extract the value to be written to the output channel.
We allocate separate output channels to different readers. The downstream tasks of a ClassMethodNode with self.is_class_method_output == True are the readers of an output channel of its upstream ClassMethodNode.
Related issue number
Closes #45569
Checks
- [ ] I've signed off every commit(by using the -s flag, i.e.,
git commit -s) in this PR. - [ ] I've run
scripts/format.shto lint the changes in this PR. - [ ] I've included any doc changes needed for https://docs.ray.io/en/master/.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
doc/source/tune/api/under the corresponding.rstfile.
- [ ] I've added any new APIs to the API Reference. For example, if I added a
method in Tune, I've added it in
- [ ] I've made sure the tests are passing. Note that there might be a few flaky tests, see the recent failures at https://flakey-tests.ray.io/
- Testing Strategy
- [ ] Unit tests
- [ ] Release tests
- [ ] This PR is not tested :(
It is ready for a new review now. Thanks!
QQ: This is a non-blocking comment. But, it looks like now ClassMethodNode has 2 roles with if/else, which is pretty ugly (it can become output and a call). What's the reason why we just build another node class (ClassMethodOutputNode) similar to OutputNode? And connect ClassMethodNode and ClassMethodOutputNode using intra process channel?
@rkooo567 Are you asking why we don't build another node class? In the current version, we do not build a new class ClassMethodOutputNode and instead implement it inside ClassMethodNode.
@stephanie-wang @ruisearch42 Asking others to also join the discussion on whether or not separating ClassMethodNode and ClassMethodOutputNode.
Personally, I prefer separating ClassMethodNode and ClassMethodOutputNode. Optionally, when num_returns == 1, we can return a single ClassMethodOutputNode in a tuple. Then the return type is consistent for any num_returns.
Previously, Stephanie suggests combining both in ClassMethodNode. The pros are it is simpler and good enough for the current requirement. So this is the current version.
@rkooo567 Are you asking why we don't build another node class? In the current version, we do not build a new class ClassMethodOutputNode and instead implement it inside ClassMethodNode.
Yeah at least from reading code right now, classMethodNode is actually classMethodNode AND ClassMethodOutputNode, distinguished by some internal flag. So if we have a Dag of 1 actor task that returns 2 output, it looks like
classMethodNode (method)
----> classMethodNode (output)
----> classMethodNode (output)
I think it should be either
- the DAG should only conatain 1 classMethodNode
- OR we should separate classMethodNode and ClassMethodOutputNode
Any thoughts @stephanie-wang @kevin85421
I chatted with @dengwxn last Friday. Summarize our chat:
There are three levels of write in ADAG:
- (Level 1)
Executable._write: ReadCOMPUTE's output value from the intermediate buffer, and then pass the output value toSynchronousWriter. - (Level 2) (
WriterInterface)SynchronousWriter.write/AwaitableBackgroundWriter: CallChannelInterface.write. - (Level 3)
ChannelInterface.write: Write data to shared memory, intra-process buffer, or NCCL.
The current solution of this PR is to implement in level 2. Hence, the current semantic will be:
ChannelInterface: Write the "same" data to multiple readers via one or multiple channels.WriterInterface: Write different parts of theCOMPUTEoutput to different readers via multiple channels (ChannelInterface).
That is, both ChannelInterface and WriterInterface will have multiple channels (i.e. ChannelInterface). For (1), @dengwxn mentions that Rust has a similar semantics (link1, link2).
Is (1) the semantics that we want to provide? If so, I think the current implementation makes sense to me.
[Update]: Check with the team. (1) is the semantics that we want to provide.
yeah (1) makes sense.
My current thoughts;
- Channel connects one writer -> multiple readers for "the same value"
- we already follow this abstraction.
- If we want to support multiple objects, existing abstraction needs improvement
- That said, each input/output should contain 1 channel. InputAttributeNode and ClassMethodOutputNode each of them should have 1 channel.
Can you also add a test for non-compiled DAG execution. Just one test is okay, but we should check that it correctly executes the actor method once, even if it is used by multiple downstream tasks that read different output idxs.
Already done. In some tests, we run normal Ray execution first. For example, in test_two_returns_two_readers.
btw there are failures from test_two_from_three_returns
Can you also add a test for non-compiled DAG execution. Just one test is okay, but we should check that it correctly executes the actor method once, even if it is used by multiple downstream tasks that read different output idxs.
Already done. In some tests, we run normal Ray execution first. For example, in
test_two_returns_two_readers.
Ah, I see! But we should probably move the non-compiled testing to dag/tests/test_class_dag.py. The reasoning is that we want to put tests for aDAG functionality into test_accelerated_dag.py, and tests for non-accelerated DAG functionality in dag/tests; that way if only one of the tests fails, we know which functionality has the bug.
Also, please correct me if I'm wrong, but it didn't look like the tests for non-compiled DAG tested that the method is only executed once? I.e. return_two is idempotent so we can't tell from the DAG output how many times the task executed.
Address most of the comments above. Here are the follow-up TODOs.
- Fix segfault when an error is raised in the writer (when
num_returnsis inconsistent). An issue will be created. - Remove
output_idxsand useoutput_channelsonly.
There are some CI failures. Would you mind fixing the issue? Thanks!
@dengwxn there is a new conflict. would you mind fixing it? Thanks!