DeepSpeed-MII icon indicating copy to clipboard operation
DeepSpeed-MII copied to clipboard

Reproduced readme results

Open Traveller2001 opened this issue 1 year ago • 8 comments

image Hi! Can you provide the code and launch command of this experiment. Any references would be greatly appreciated!

Traveller2001 avatar Dec 18 '23 04:12 Traveller2001

@mrwyattii Looking forward to your reply

Traveller2001 avatar Dec 18 '23 04:12 Traveller2001

Hi @Traveller2001, you can find our benchmark code here: https://github.com/microsoft/DeepSpeedExamples/tree/master/benchmarks/inference/mii

mrwyattii avatar Dec 18 '23 18:12 mrwyattii

Contributor

Thank you very much for your reply! I would like to ask again about the use of pipeline, that is traversing my own test set and inputting it into pipeline by batch. But I found this error in the later iteration: Deadlock detected. Resetting KV cache and recomputing requests. Consider limiting number of concurrent requests or Consider limiting number of concurrent requests or decreasing max lengths of prompts/generations. Is this the reason why the kvcache used in the last iteration is not released properly. How should I properly use non-persistent mode correctly to fully utilize the performance of this framework? @mrwyattii

Traveller2001 avatar Dec 19 '23 03:12 Traveller2001

Deadlock can happen with both persistent and non-persistent modes. We don't check if there is enough memory to compute all the potential generated tokens for each request before placing it on the inference engine. This is for performance reasons, but if there are too many requests we run into this deadlock scenario (Read more in this comment).

We are working on improvements that allow users to better tune the behavior of the inference engine to avoid deadlock for their given use case.

One benefit of using the persistent deployment is that you can send requests periodically rather than all at once. For example, if you have 50 requests... using the non-persistent pipeline and sending them all in one batch response = pipeline(my_50_requests) will likely result in deadlock. With the persistent deployment you can have many clients, each sending a sub-batch at a slight offset in time from each other. This second scenario is what we do in our benchmarks as it more closely emulates a "real-world" serving scenario.

The persistent deployment also has the benefit of smaller time to response for requests. With the non-persistent pipeline, you would have to wait for all 50 requests to finish before receiving any response. But with multiple clients, you will get your first response much sooner.

Please let me know if I can clarify anything further!

mrwyattii avatar Dec 19 '23 17:12 mrwyattii

Deadlock can happen with both persistent and non-persistent modes. We don't check if there is enough memory to compute all the potential generated tokens for each request before placing it on the inference engine. This is for performance reasons, but if there are too many requests we run into this deadlock scenario (Read more in this comment).

We are working on improvements that allow users to better tune the behavior of the inference engine to avoid deadlock for their given use case.

One benefit of using the persistent deployment is that you can send requests periodically rather than all at once. For example, if you have 50 requests... using the non-persistent pipeline and sending them all in one batch response = pipeline(my_50_requests) will likely result in deadlock. With the persistent deployment you can have many clients, each sending a sub-batch at a slight offset in time from each other. This second scenario is what we do in our benchmarks as it more closely emulates a "real-world" serving scenario.

The persistent deployment also has the benefit of smaller time to response for requests. With the non-persistent pipeline, you would have to wait for all 50 requests to finish before receiving any response. But with multiple clients, you will get your first response much sooner.

Please let me know if I can clarify anything further!

Thank you very much for your patient and detailed response!The main questions about non-persistent modes are as follows: image You can see that in the first few loops the response is returned normally, but then there is a deadlock, so I doubt that the kvcache that was used up in the previous loops was not released normally.This is a minor issue for me though, I'm trying out the persistent mode and it should be good from trying the benchmark code. Then I would also like to ask again if this framework can support flash decoding or even flash decoding++, I'm very much looking forward to adding this algorithm to this framework! @mrwyattii

Traveller2001 avatar Dec 20 '23 15:12 Traveller2001

@mrwyattii Can you please look at why this problem is occurring, I'm trying to test my own data in persistent mode, but it's going wrong in the warm up phase:

[2023-12-21 13:01:43,472] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,472] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,472] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,474] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,486] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,486] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,486] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,491] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,491] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,492] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,493] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,494] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,508] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,512] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,523] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
[2023-12-21 13:01:43,535] [INFO] [real_accelerator.py:161:get_accelerator] Setting ds_accelerator to cuda (auto detect)
warmup queue size: 10000 (1393492)
---------------------------------init successfully-------------------
warmup queue size: 10000 (1393488)
warmup queue size: 10000 (1393484)
warmup queue size: 10000 (1393487)
warmup queue size: 10000 (1393485)
warmup queue size: 10000 (1393483)
warmup queue size: 9999 (1393481)
warmup queue size: 9999 (1393479)
warmup queue size: 9999 (1393491)
warmup queue size: 9999 (1393494)
warmup queue size: 9999 (1393480)
warmup queue size: 9998 (1393489)
warmup queue size: 9998 (1393486)
warmup queue size: 9998 (1393493)
warmup queue size: 9998 (1393490)
warmup queue size: 9998 (1393482)
Exception in thread Thread-3:
Traceback (most recent call last):
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/threading.py", line 980, in _bootstrap_inner
    self.run()
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/threading.py", line 917, in run
    self._target(*self._args, **self._kwargs)
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1222, in _serve
    if not _process_event_and_continue(state, event):
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1182, in _process_event_and_continue
    rpc_state, rpc_future = _handle_call(
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1058, in _handle_call
    _handle_with_method_handler(
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 1005, in _handle_with_method_handler
    return _handle_unary_stream(
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/site-packages/grpc/_server.py", line 872, in _handle_unary_stream
    return thread_pool.submit(
  File "/home/test_user/miniconda3/envs/infer/lib/python3.9/concurrent/futures/thread.py", line 169, in submit
    raise RuntimeError('cannot schedule new futures after '
RuntimeError: cannot schedule new futures after interpreter shutdown
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up
pass warm up

this is run_all.sh:

RAGGED_BATCH_SIZE=512
PARAM_SIZE=(70b)

TP=8

DEPLOYMENT_NAME=llama2-${PARAM_SIZE}-tp${TP}-b${RAGGED_BATCH_SIZE}
python server.py --model_name /data/hf_models/Llama-2-70b-chat-hf -d ${DEPLOYMENT_NAME} -m ${TP} -b ${RAGGED_BATCH_SIZE} start

DEPLOYMENT_NAME=${DEPLOYMENT_NAME} bash ./run_benchmark_client.sh

echo "Stopping server"
python server.py -d ${DEPLOYMENT_NAME} stop

this is run_benchmark_client.sh

#!/bin/bash

DEPLOYMENT_NAME=${DEPLOYMENT_NAME:-llama2-70b}
VLLM=${VLLM:-""}

CLIENT_NUMS=${CLIENT_NUMS:-16}
REQUEST_NUM=${REQUEST_NUM:-10000}

LOG_DIR=logs.${DEPLOYMENT_NAME}
mkdir -p ${LOG_DIR}

RESULT_FILE=${DEPLOYMENT_NAME}_c${CLIENT_NUMS}.json
python run_benchmark_client.py -w 1 \
    -d ${DEPLOYMENT_NAME} -n ${REQUEST_NUM} -c ${CLIENT_NUMS} \
    -o ${LOG_DIR}/${RESULT_FILE} \
    ${VLLM} --stream \
    2>&1 | tee ${LOG_DIR}/bench_client_num_c${CLIENT_NUMS}.log 

this is the chaged code in run_benchmark_client.py

def _run_parallel(deployment_name, warmup, barrier, query_queue, result_queue, client_num, stream, vllm):
    pid = os.getpid()
    session_id = f"test_session_p{pid}_t{threading.get_ident()}"

    event_loop = asyncio.new_event_loop()
    asyncio.set_event_loop(event_loop)
    if not vllm:
        import mii
        client = mii.client(deployment_name)
    time.sleep(random.uniform(0, client_num) * 0.01)
    barrier.wait()

    for _ in range(warmup):
        print(f"warmup queue size: {query_queue.qsize()} ({pid})", flush=True)
        input_tokens, req_max_new_tokens = query_queue.get(timeout=1.0)
        if vllm:
            call_vllm(input_tokens, req_max_new_tokens, stream)
        else:
            call_mii(client, input_tokens, req_max_new_tokens, stream)
            
    print("pass warm up",flush=True)

    barrier.wait()

    time.sleep(random.uniform(0, client_num) * 0.01)
    try:
        while not query_queue.empty():
            print(f"queue size: {query_queue.qsize()} ({pid})", flush=True)
            input_tokens, req_max_new_tokens = query_queue.get(timeout=1.0)

            # Set max_new_tokens following normal distribution
            if vllm:
                r = call_vllm(input_tokens, req_max_new_tokens)
            else:
                r = call_mii(client, input_tokens, req_max_new_tokens, stream)
            result_queue.put(r)
    except queue.Empty:
        print(f"queue is empty ({pid})",flush=True)

    print(f"Worker ({pid}) finished. session_id: {session_id}",flush=True)



def run_client(client_num, deployment_name, num_queries, warmup, stream, vllm, use_thread=False):
    """
    Run MII client for benchmarking. The scenario is a bit complicated:
    1. The main process puts `num_queries` queries into the input queue
    2. Each client runs `warmup` iterations () taking the queries from the input queue
    3. --- barrier ---
    4. The main process marks the start time
    5a. All clients send `num_queries' query in total and put the results into the result queue
    5b. The main process takes the results from the result queue (in parallel with 5a)
    6. The main process marks the end time after receiving `num_queries' results
    """

    if use_thread:
        runnable_cls = threading.Thread
        barrier_cls = threading.Barrier
        queue_cls = queue.Queue
    else:
        runnable_cls = multiprocessing.Process
        barrier_cls = multiprocessing.Barrier
        queue_cls = multiprocessing.Queue

    barrier = barrier_cls(client_num + 1)
    query_queue = queue_cls()
    result_queue = queue_cls()

    processes = [runnable_cls(target=_run_parallel,
                              args=(deployment_name, warmup, barrier, query_queue, result_queue, client_num, stream, vllm))
                 for i in range(client_num)]
    for p in processes:
        p.start()


    tokenizer = AutoTokenizer.from_pretrained("/data/hf_models/Llama-2-70b-chat-hf")
   

    with open("mydataset.json") as f:
        request_text = json.load(f)

    num_samples = args.num_queries
    if num_samples is not None:
        request_text = request_text[0:num_samples]


    for p,pl,ol in request_text:
        query_queue.put((p, ol))
    print("---------------------------------init process---------------------------",flush=True)
    # Tokenizers must be initialized after fork.
    # So we need to fork before putting inputs to the queue.
    # We need this barrier to stop child processse from taking inputs before the main process puts them
    
    barrier.wait()

    print("---------------------------------init successfully-------------------",flush=True)
    # This barrier is to make sure that all clients have finished warmup
    barrier.wait()
    print("---------------------------------finish warm up-------------------",flush=True)

    response_details = []
    print("------------------------get the result-----------------------------------",flush=True)
    while len(response_details) < args.num_queries:
        res = result_queue.get()
        # vLLM returns concatinated tokens
        if vllm:
            all_tokens = tokenizer.tokenize(res.generated_tokens)
            res.generated_tokens = all_tokens[len(tokenizer.tokenize(res.prompt)):]
        response_details.append(res)
        print("====================",flush=True)
        print(len(response_details),flush=True)
        print("====================",flush=True)

    return response_details

looking forword for your reply~

Traveller2001 avatar Dec 21 '23 13:12 Traveller2001

@mrwyattii Your help is very much needed, using persistence has not been very smooth, why is it that when running on my own dataset, there is always a situation where the GPU is no longer utilized, but the program never executes, and no error is reported.what should I need to use non persistence mode correctly.

Traveller2001 avatar Dec 23 '23 10:12 Traveller2001

Hi @Traveller2001 sorry for the delay in response, I've been out of office for the past 2 weeks. Are you able to run the benchmarks without the changes you specify here? I will try to reproduce the results, but @tohtana may be able to provide better help as they were the one that wrote the benchmark code.

mrwyattii avatar Jan 03 '24 17:01 mrwyattii