DeepSpeed-MII
DeepSpeed-MII copied to clipboard
Reproduced readme results
Hi! Can you provide the code and launch command of this experiment. Any references would be greatly appreciated!
@mrwyattii Looking forward to your reply
Hi @Traveller2001, you can find our benchmark code here: https://github.com/microsoft/DeepSpeedExamples/tree/master/benchmarks/inference/mii
Contributor
Thank you very much for your reply! I would like to ask again about the use of pipeline, that is traversing my own test set and inputting it into pipeline by batch. But I found this error in the later iteration: Deadlock detected. Resetting KV cache and recomputing requests. Consider limiting number of concurrent requests or Consider limiting number of concurrent requests or decreasing max lengths of prompts/generations.
Is this the reason why the kvcache used in the last iteration is not released properly. How should I properly use non-persistent mode correctly to fully utilize the performance of this framework? @mrwyattii
Deadlock can happen with both persistent and non-persistent modes. We don't check if there is enough memory to compute all the potential generated tokens for each request before placing it on the inference engine. This is for performance reasons, but if there are too many requests we run into this deadlock scenario (Read more in this comment).
We are working on improvements that allow users to better tune the behavior of the inference engine to avoid deadlock for their given use case.
One benefit of using the persistent deployment is that you can send requests periodically rather than all at once. For example, if you have 50 requests... using the non-persistent pipeline and sending them all in one batch response = pipeline(my_50_requests)
will likely result in deadlock. With the persistent deployment you can have many clients, each sending a sub-batch at a slight offset in time from each other. This second scenario is what we do in our benchmarks as it more closely emulates a "real-world" serving scenario.
The persistent deployment also has the benefit of smaller time to response for requests. With the non-persistent pipeline, you would have to wait for all 50 requests to finish before receiving any response. But with multiple clients, you will get your first response much sooner.
Please let me know if I can clarify anything further!
Deadlock can happen with both persistent and non-persistent modes. We don't check if there is enough memory to compute all the potential generated tokens for each request before placing it on the inference engine. This is for performance reasons, but if there are too many requests we run into this deadlock scenario (Read more in this comment).
We are working on improvements that allow users to better tune the behavior of the inference engine to avoid deadlock for their given use case.
One benefit of using the persistent deployment is that you can send requests periodically rather than all at once. For example, if you have 50 requests... using the non-persistent pipeline and sending them all in one batch
response = pipeline(my_50_requests)
will likely result in deadlock. With the persistent deployment you can have many clients, each sending a sub-batch at a slight offset in time from each other. This second scenario is what we do in our benchmarks as it more closely emulates a "real-world" serving scenario.The persistent deployment also has the benefit of smaller time to response for requests. With the non-persistent pipeline, you would have to wait for all 50 requests to finish before receiving any response. But with multiple clients, you will get your first response much sooner.
Please let me know if I can clarify anything further!
Thank you very much for your patient and detailed response!The main questions about non-persistent modes are as follows:
You can see that in the first few loops the response is returned normally, but then there is a deadlock, so I doubt that the kvcache that was used up in the previous loops was not released normally.This is a minor issue for me though, I'm trying out the persistent mode and it should be good from trying the benchmark code. Then I would also like to ask again if this framework can support flash decoding or even flash decoding++, I'm very much looking forward to adding this algorithm to this framework! @mrwyattii
@mrwyattii Can you please look at why this problem is occurring, I'm trying to test my own data in persistent mode, but it's going wrong in the warm up phase:
[2023-12-21 13:01:43,472] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,472] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,472] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,474] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,486] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,486] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,486] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,491] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,491] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,492] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,493] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,494] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,508] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,512] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,523] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,535] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
warmup queue size: 10000 (1393492)
---------------------------------init successfully-------------------
warmup queue size: 10000 (1393488)
warmup queue size: 10000 (1393484)
warmup queue size: 10000 (1393487)
warmup queue size: 10000 (1393485)
warmup queue size: 10000 (1393483)
warmup queue size: 9999 (1393481)
warmup queue size: 9999 (1393479)
warmup queue size: 9999 (1393491)
warmup queue size: 9999 (1393494)
warmup queue size: 9999 (1393480)
warmup queue size: 9998 (1393489)
warmup queue size: 9998 (1393486)
warmup queue size: 9998 (1393493)
warmup queue size: 9998 (1393490)
warmup queue size: 9998 (1393482)
Exception in thread Thread-3:
Traceback (most recent call last):
File "/home/test_user/miniconda3/envs/infer/lib/python3.9/threading.py", line 980, in _bootstrap_inner
self.run()
File "/home/test_user/miniconda3/envs/infer/lib/python3.9/threading.py", line 917, in run
self._target(*self._args, **self._kwargs)
File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1222, in _serve
if not _process_event_and_continue(state, event):
File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1182, in _process_event_and_continue
rpc_state, rpc_future = _handle_call(
File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1058, in _handle_call
_handle_with_method_handler(
File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1005, in _handle_with_method_handler
return _handle_unary_stream(
File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 872, in _handle_unary_stream
return thread_pool.submit(
File "/home/test_user/miniconda3/envs/infer/lib/python3.9/concurrent/futures/thread.py", line 169, in submit
raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
this is run_all.sh
:
RAGGED_BATCH_SIZE=512
PARAM_SIZE=(70b)
TP=8
DEPLOYMENT_NAME=llama2-${PARAM_SIZE}-tp${TP}-b${RAGGED_BATCH_SIZE}
python server.py --model_name /data/hf_models/Llama-2-70b-chat-hf -d ${DEPLOYMENT_NAME} -m ${TP} -b ${RAGGED_BATCH_SIZE} start
DEPLOYMENT_NAME=${DEPLOYMENT_NAME} bash ./run_benchmark_client.sh
echo "Stopping server"
python server.py -d ${DEPLOYMENT_NAME} stop
this is run_benchmark_client.sh
#!/bin/bash
DEPLOYMENT_NAME=${DEPLOYMENT_NAME:-llama2-70b}
VLLM=${VLLM:-""}
CLIENT_NUMS=${CLIENT_NUMS:-16}
REQUEST_NUM=${REQUEST_NUM:-10000}
LOG_DIR=logs.${DEPLOYMENT_NAME}
mkdir -p ${LOG_DIR}
RESULT_FILE=${DEPLOYMENT_NAME}_c${CLIENT_NUMS}.json
python run_benchmark_client.py -w 1 \
-d ${DEPLOYMENT_NAME} -n ${REQUEST_NUM} -c ${CLIENT_NUMS} \
-o ${LOG_DIR}/${RESULT_FILE} \
${VLLM} --stream \
2>&1 | tee ${LOG_DIR}/bench_client_num_c${CLIENT_NUMS}.log
this is the chaged code in run_benchmark_client.py
def _run_parallel(deployment_name, warmup, barrier, query_queue, result_queue, client_num, stream, vllm):
pid = os.getpid()
session_id = f"test_session_p{pid}_t{threading.get_ident()}"
event_loop = asyncio.new_event_loop()
asyncio.set_event_loop(event_loop)
if not vllm:
import mii
client = mii.client(deployment_name)
time.sleep(random.uniform(0, client_num) * 0.01)
barrier.wait()
for _ in range(warmup):
print(f"warmup queue size: {query_queue.qsize()} ({pid})", flush=True)
input_tokens, req_max_new_tokens = query_queue.get(timeout=1.0)
if vllm:
call_vllm(input_tokens, req_max_new_tokens, stream)
else:
call_mii(client, input_tokens, req_max_new_tokens, stream)
print("pass warm up",flush=True)
barrier.wait()
time.sleep(random.uniform(0, client_num) * 0.01)
try:
while not query_queue.empty():
print(f"queue size: {query_queue.qsize()} ({pid})", flush=True)
input_tokens, req_max_new_tokens = query_queue.get(timeout=1.0)
# Set max_new_tokens following normal distribution
if vllm:
r = call_vllm(input_tokens, req_max_new_tokens)
else:
r = call_mii(client, input_tokens, req_max_new_tokens, stream)
result_queue.put(r)
except queue.Empty:
print(f"queue is empty ({pid})",flush=True)
print(f"Worker ({pid}) finished. session_id: {session_id}",flush=True)
def run_client(client_num, deployment_name, num_queries, warmup, stream, vllm, use_thread=False):
"""
Run MII client for benchmarking. The scenario is a bit complicated:
1. The main process puts `num_queries` queries into the input queue
2. Each client runs `warmup` iterations () taking the queries from the input queue
3. --- barrier ---
4. The main process marks the start time
5a. All clients send `num_queries' query in total and put the results into the result queue
5b. The main process takes the results from the result queue (in parallel with 5a)
6. The main process marks the end time after receiving `num_queries' results
"""
if use_thread:
runnable_cls = threading.Thread
barrier_cls = threading.Barrier
queue_cls = queue.Queue
else:
runnable_cls = multiprocessing.Process
barrier_cls = multiprocessing.Barrier
queue_cls = multiprocessing.Queue
barrier = barrier_cls(client_num + 1)
query_queue = queue_cls()
result_queue = queue_cls()
processes = [runnable_cls(target=_run_parallel,
args=(deployment_name, warmup, barrier, query_queue, result_queue, client_num, stream, vllm))
for i in range(client_num)]
for p in processes:
p.start()
tokenizer = AutoTokenizer.from_pretrained("/data/hf_models/Llama-2-70b-chat-hf")
with open("mydataset.json") as f:
request_text = json.load(f)
num_samples = args.num_queries
if num_samples is not None:
request_text = request_text[0:num_samples]
for p,pl,ol in request_text:
query_queue.put((p, ol))
print("---------------------------------init process---------------------------",flush=True)
# Tokenizers must be initialized after fork.
# So we need to fork before putting inputs to the queue.
# We need this barrier to stop child processse from taking inputs before the main process puts them
barrier.wait()
print("---------------------------------init successfully-------------------",flush=True)
# This barrier is to make sure that all clients have finished warmup
barrier.wait()
print("---------------------------------finish warm up-------------------",flush=True)
response_details = []
print("------------------------get the result-----------------------------------",flush=True)
while len(response_details) < args.num_queries:
res = result_queue.get()
# vLLM returns concatinated tokens
if vllm:
all_tokens = tokenizer.tokenize(res.generated_tokens)
res.generated_tokens = all_tokens[len(tokenizer.tokenize(res.prompt)):]
response_details.append(res)
print("====================",flush=True)
print(len(response_details),flush=True)
print("====================",flush=True)
return response_details
looking forword for your reply~
@mrwyattii Your help is very much needed, using persistence has not been very smooth, why is it that when running on my own dataset, there is always a situation where the GPU is no longer utilized, but the program never executes, and no error is reported.what should I need to use non persistence mode correctly.
Hi @Traveller2001 sorry for the delay in response, I've been out of office for the past 2 weeks. Are you able to run the benchmarks without the changes you specify here? I will try to reproduce the results, but @tohtana may be able to provide better help as they were the one that wrote the benchmark code.