[Core] Multiprocessing executor for single-node multi-GPU deployment
ray is a powerful platform for general purpose distributed computing but potentially overkill for the specific requirements of realtime synchronized inferencing between GPUs on a single node.
We would prefer to have a "lightweight" option without the ray dependency for non-ray cluster environments. This also helps with production security compliance.
With the changes in this PR, ray will continue to be used for parallel workers if it's installed, otherwise vanilla python multiprocessing is used. It can also be overridden with --no-worker-use-ray.
Worker processes are shut down when the LLMEngine is garbage collected.
This PR was co-authored by @sahilsuneja1.
This reworks the original PR #2898 to plug into the new distributed executor abstraction.
I've introduced a MultiGPUExecutor abstract superclass shared between the ray and vanilla multiprocessing implementations.
@zhuohan123 @simon-mo this one should be ready to go! :pray:
Thanks for the review @simon-mo! I've addressed all of your comments
@zhuohan123 @simon-mo @WoosukKwon I just tried some cursory performance comparisons, wasn't expecting the difference to be so significant. Surprisingly Ray doesn't appear to give any latency benefits over single GPU for the config I tried.
Using 80GB A100s, with llama-2-7b openai completion API. Single request with 5 input tokens, 2000 generated tokens. I repeated each test request multiple times, results were very consistent.
| Time (sec) | Difference | |
|---|---|---|
| TP=1 | 24.2 | 0 |
| TP=2 using Ray (main or this PR) | 24.2 | 0 |
TP=2 without Ray (this PR with --no-worker-use-ray) |
17.0 | -30% |
@njhill Wow! 30% is quite a bit (albeit serving llama-7b over 2 A100-80G probably doesn't really make sense in practice).
I will do some testing on this branch on parallel with serving benchmark and report back as well
Thanks @ywang96, yes I'm sure this will be smaller in relative terms for larger models. But not bad given performance improvement was not the purpose of this PR.
@njhill I did some preliminary testing on H100 TP2 with mistralai/Mistral-7B-Instruct-v0.1 and there's definitly some speedup (not as much as 30% since this is running on sharegpt).
Server launch command:
python -m vllm.entrypoints.openai.api_server \
--model mistralai/Mistral-7B-Instruct-v0.1 \
--swap-space 16 \
--disable-log-requests \
--tensor-parallel-size 2 \
--no-worker-use-ray #comment if use ray
Benchmark command:
python benchmarks/benchmark_serving.py \
--backend vllm \
--model mistralai/Mistral-7B-Instruct-v0.1 \
--dataset-name sharegpt \
--dataset-path ShareGPT_V3_unfiltered_cleaned_split.json \
--request-rate 1 \
--num-prompts 100
With Ray workers:
---------------Time to First Token----------------
Mean TTFT (ms): 25.42
Median TTFT (ms): 31.51
P99 TTFT (ms): 49.27
-----Time per Output Token (excl. 1st token)------
Mean TPOT (ms): 8.21
Median TPOT (ms): 8.20
P99 TPOT (ms): 10.59
This PR:
---------------Time to First Token----------------
Mean TTFT (ms): 22.31
Median TTFT (ms): 27.61
P99 TTFT (ms): 36.08
-----Time per Output Token (excl. 1st token)------
Mean TPOT (ms): 7.75
Median TPOT (ms): 7.79
P99 TPOT (ms): 9.59
Thanks @ywang96, that's great! 5-6% lower TPOT still nice to have! I am doing some spot tests on TP=4 llama-70b too.
For llama-2-70b with single request 5 input / 1000 output the times I got are 32.3 before, 30.8 after i.e. 4-5% speedup.
For llama-2-70b with single request 5 input / 1000 output the times I got are 32.3 before, 30.8 after i.e. 4-5% speedup.
I will test on A100-80G with Mixtral TP4 and TP8 just to see if 4-5% is likely the average speedup we get in general.
Results for Mixtral on A100-80G - For each configuration I ran 3 times and took the best results (usually the first run is bad because of warmup)
TP4
With Ray workers
---------------Time to First Token----------------
Mean TTFT (ms): 49.05
Median TTFT (ms): 52.50
P99 TTFT (ms): 89.09
-----Time per Output Token (excl. 1st token)------
Mean TPOT (ms): 16.81
Median TPOT (ms): 17.73
P99 TPOT (ms): 22.24
This PR
---------------Time to First Token----------------
Mean TTFT (ms): 43.00
Median TTFT (ms): 45.46
P99 TTFT (ms): 69.40
-----Time per Output Token (excl. 1st token)------
Mean TPOT (ms): 16.14
Median TPOT (ms): 17.23
P99 TPOT (ms): 20.72
TP8
With Ray workers
---------------Time to First Token----------------
Mean TTFT (ms): 48.72
Median TTFT (ms): 52.59
P99 TTFT (ms): 72.38
-----Time per Output Token (excl. 1st token)------
Mean TPOT (ms): 12.76
Median TPOT (ms): 13.01
P99 TPOT (ms): 16.83
This PR
---------------Time to First Token----------------
Mean TTFT (ms): 42.10
Median TTFT (ms): 44.90
P99 TTFT (ms): 66.98
-----Time per Output Token (excl. 1st token)------
Mean TPOT (ms): 11.33
Median TPOT (ms): 11.66
P99 TPOT (ms): 15.06
Would appreciate if @njhill you can try to reproduce some of these to confirm the speedup too!
@zhuohan123 it's probably obvious but once #3763 is merged (assuming it will be merged first), this one will need some minor rework to incorporate that.
@njhill do you think it is possible to use torch rpc https://pytorch.org/docs/stable/rpc.html , so that you don't have ray overhead, but still maintain the overall architecture, and with multiprocessing benefit?
@youkaichao I haven't used torch rpc before but could experiment with that and see how the performance compares. It shouldn't be difficult hopefully to swap out the multiprocessing queue for this.
I don't think that this PR changes the architecture really, or that using torch rpc would change too much of this. The process monitoring stuff would still be needed and the queue-based message passing is all encapsulated in local_worker_utils.py anyhow.
This PR keeps the ray structure, just moves common logic into a MultiGPUExecutor superclass, with some other minor simplification to the executor class hierarchy (share common __init__ method to set the config fields)
I don't think that this PR changes the architecture really, or that using torch rpc would change too much of this.
_init_driver_worker_and_model requires that we know in advance all the functions we want to call, and everytime we want to add a new part in the whole procedure, we have to change the function name :(
I don't think that this PR changes the architecture really, or that using torch rpc would change too much of this.
_init_driver_worker_and_modelrequires that we know in advance all the functions we want to call, and everytime we want to add a new part in the whole procedure, we have to change the function name :(
@youkaichao I'm not sure that I follow, this method is just the common part of the init between the ray and multiproc subclasses. They each start their respective workers and then call this to start the driver worker and model. I don't see why refactoring this if/when needed in future is a big deal?
@njhill please take a look at the minimal code example here:
# test.py
class Actor:
def __init__(self, value):
self.value = value
def compute(self, x):
return x + self.value
_singleton = None
def init_singleton(value):
global _singleton
if _singleton is None:
_singleton = Actor(value)
def compute(x):
return _singleton.compute(x)
if __name__ == "__main__":
import torch.distributed as dist
dist.init_process_group()
import torch.distributed.rpc as rpc
rank = dist.get_rank()
name = str(rank)
rpc.init_rpc(name, rank=dist.get_rank(), world_size=dist.get_world_size())
if dist.get_rank() == 0:
workers = []
for i in range(dist.get_world_size()):
worker_name = str(i)
workers.append(rpc.rpc_async(worker_name, init_singleton, args=(i,)))
for worker in workers:
worker.wait()
for i in range(dist.get_world_size()):
worker_name = str(i)
ret = rpc.rpc_async(worker_name, compute, args=(i,))
print(f"Rank {i}: {ret.wait()}")
rpc.shutdown()
Run it with torchrun --nproc-per-node 4 test.py , the output is
Rank 0: 0
Rank 1: 2
Rank 2: 4
Rank 3: 6
It's very similar to ray, I'm not sure what's the overhead comparison.
Another example with more convenient actor-like usage:
class Actor:
def __init__(self, value):
self.value = value
def compute(self, x):
return x + self.value
if __name__ == "__main__":
import torch.distributed as dist
dist.init_process_group()
import torch.distributed.rpc as rpc
rank = dist.get_rank()
name = str(rank)
rpc.init_rpc(name, rank=dist.get_rank(), world_size=dist.get_world_size())
if dist.get_rank() == 0:
workers = []
for i in range(dist.get_world_size()):
worker_name = str(i)
worker = rpc.remote(worker_name, Actor, args=(i,))
workers.append(worker)
rets = []
for i, worker in enumerate(workers):
worker_name = str(i)
ret = worker.rpc_async().compute(i)
rets.append(ret)
for i, ret in enumerate(rets):
print(f"Rank {i}: {ret.wait()}")
rpc.shutdown()
Thanks @youkaichao, I won’t have a chance to try this today but should do at the weekend.
Any ETA when this PR will be merged? Thank you.
QQ: after this PR, is it
- ray is downloaded by default, but you have an option to not use it
- or the multi processing becomes the default?
Also, can we make sure ray based TP > 1 is tested in CI?
- ray is downloaded by default, but you have an option to not use it
@esmeetu I have arranged it such that ray is an optional extra, so by default it won't be but will if you run e.g. pip install vllm[ray].
I changed the Dockerfile to do this, so it will still be included in the published docker image.
- or the multi processing becomes the default?
Actually the current logic in this PR is that by default it will use ray if it's installed but won't otherwise, but wondering whether we should change the default logic to only use it for multi-node since given that things are faster without it (per above).
Also, can we make sure ray based TP > 1 is tested in CI?
Yes it still is, I just parameterized the existing distributed test to run both with and without ray.
I tested baichuan 13B TP=2 with/without cudagraph on A100, the performance of the PR and main branch is the same. Do you test 13B models?
what's the delta between this vs ray default (for a single node case) except the performance? I assume it supports logging prefix, so maybe just the debugger and the ray dashboard?
@rkooo567 yes I think that's all correct.
My suggestions:
- try if torch rpc works and if it has benefit. that modification should be small, and the usage is quite similar to ray, while the underlying implementation is multiprocessing.
- tear down the PR in small sizes, so that we can digest it step-by-step.
Just a note for this. I use Ray to do multi node batch inference with vllm. (On a 8x8*A10) And with models that fit in a single GPU it worked perfectly but trying to initialize tensor parallel models with ray, within a ray instance doesn't work. I think this solution is the only way to do multi node batch Inference with ray orchestrating the nodes. And the multiprocessing for tensor parallel 70b inside the worker node.
Thank you for this PR! Hope it gets merged soon.
My team at work has been looking for a way to do efficient autoregressive generation during LLM fine-tuning. We'd like to tap into the efficiency of vLLM, but so far haven't been able to run torch FSDP alongside vLLM on the same set of GPUs. The changes proposed in this pull request have resolved our issue. Thanks again Nick for the great work, and I'd love to see this pull request being merged very soon.
@nivibilla @jacobthebanana I had some interrupts in the last few days but will make sure this lands this week (not this PR but the ones that replaced it).
This was broken into smaller PRs which have now all been merged, see #4539.