tensorrtllm_backend
tensorrtllm_backend copied to clipboard
sreaming mode doesn't work
System Info
V100*2 nvcr.io/nvidia/tritonserver:24.01-trtllm-python-py3 tensorrt-llm 0.7.0
Who can help?
No response
Information
- [X] The official example scripts
- [ ] My own modified scripts
Tasks
- [ ] An officially supported task in the
examples
folder (such as GLUE/SQuAD, ...) - [ ] My own task or dataset (give details below)
Reproduction
I want deploy qwen14B in triton like https://github.com/NVIDIA/TensorRT-LLM/tree/a8018c14e6a9868b507a0517550b2cc6e41bd86e/examples/qwen
1.build engine
python3 build.py --hf_model_dir /root/model_repo \ --dtype float16 \ --remove_input_padding \ --use_gpt_attention_plugin float16 \ --enable_context_fmha \ --use_gemm_plugin float16 \ --output_dir /root/Qwen/14B/trt_engines/fp16/2-gpu \ --world_size 2 \ --tp_size 2
2. tritonserver
cd /root/Qwen/14B/trt_engines/fp16/2-gpu cp -r ./* /tensorrtllm_backend/triton_model_repo/tensorrt_llm/1/
cd /root/ cp -r model_repo /tensorrtllm_backend/triton_model_repo/tensorrt_llm/ rm /tensorrtllm_backend/triton_model_repo/tensorrt_llm/model_repo/*.safetensors
cd /tensorrtllm_backend python3 scripts/launch_triton_server.py --world_size=2 --model_repo=/tensorrtllm_backend/triton_model_repo
When I
curl -X POST 10.110.31.16:8001/v2/models/tensorrt_llm_bls/generate_stream \ -d '{"text_input": "<|im_start|>system\n you are a writer .<|im_end|>\n<|im_start|>user\nwho are you ?<|im_end|>\n<|im_start|>assistant\n", "max_tokens": 54, "bad_words": "\n", "stop_words": "", "end_id": [151643], "pad_id": [151643],"stream": true }'
It gave me a whole output not stream like
the model.py for tensorrt_llm_bls is `mport json import traceback
import numpy as np import triton_python_backend_utils as pb_utils
class TritonPythonModel:
def initialize(self, args):
# Parse model configs
model_config = json.loads(args['model_config'])
params = model_config['parameters']
accumulate_tokens_str = ''
if 'accumulate_tokens' in params:
accumulate_tokens_str = params['accumulate_tokens']['string_value']
self.accumulate_tokens = accumulate_tokens_str.lower() in [
'true', 'yes', '1', 't'
]
self.decoupled = pb_utils.using_decoupled_model_transaction_policy(
model_config)
print(f"=============decouple:{self.decoupled}==========")
self.logger = pb_utils.Logger
self.bls_input_tensor_names = [
"text_input", "max_tokens", "bad_words", "stop_words", "end_id",
"pad_id", "top_k", "top_p", "temperature", "length_penalty",
"repetition_penalty", "min_length", "presence_penalty",
"random_seed", "return_log_probs", "beam_width", "stream",
"prompt_embedding_table", "prompt_vocab_size",
"embedding_bias_words", "embedding_bias_weights"
]
self.preproc_input_to_bls_input_map = {
"QUERY": "text_input",
"REQUEST_OUTPUT_LEN": "max_tokens",
"BAD_WORDS_DICT": "bad_words",
"STOP_WORDS_DICT": "stop_words",
"EMBEDDING_BIAS_WORDS": "embedding_bias_words",
"EMBEDDING_BIAS_WEIGHTS": "embedding_bias_weights"
}
self.preproc_output_to_trtllm_input_map = {
"INPUT_ID": "input_ids",
"REQUEST_INPUT_LEN": "input_lengths",
"REQUEST_OUTPUT_LEN": "request_output_len",
"BAD_WORDS_IDS": "bad_words_list",
"STOP_WORDS_IDS": "stop_words_list",
"EMBEDDING_BIAS": "embedding_bias",
}
self.trtllm_input_to_bls_input_map = {
"end_id": "end_id",
"pad_id": "pad_id",
"beam_width": "beam_width",
"runtime_top_k": "top_k",
"runtime_top_p": "top_p",
"len_penalty": "length_penalty",
"repetition_penalty": "repetition_penalty",
"min_length": "min_length",
"presence_penalty": "presence_penalty",
"random_seed": "random_seed",
"return_log_probs": "return_log_probs",
"streaming": "stream",
"prompt_embedding_table": "prompt_embedding_table",
"prompt_vocab_size": "prompt_vocab_size",
}
self.trtllm_output_to_postproc_input_map = {
"output_ids": "TOKENS_BATCH",
"sequence_length": "SEQUENCE_LENGTH",
"cum_log_probs": "CUM_LOG_PROBS",
"output_log_probs": "OUTPUT_LOG_PROBS",
}
self.postproc_output_to_bls_output_map = {
"OUTPUT": "text_output",
"OUT_CUM_LOG_PROBS": "cum_log_probs",
"OUT_OUTPUT_LOG_PROBS": "output_log_probs",
}
def _get_bls_input_tensors_map(self, request):
bls_input_tensors_map = {}
for input_tensor_name in self.bls_input_tensor_names:
tensor = pb_utils.get_input_tensor_by_name(request,
input_tensor_name)
if tensor != None:
bls_input_tensors_map[input_tensor_name] = tensor
return bls_input_tensors_map
def _get_preproc_input_tensors(self, bls_input_tensors_map):
preproc_input_tensors = []
for preproc_name, bls_name in self.preproc_input_to_bls_input_map.items(
):
if bls_name in bls_input_tensors_map:
tensor = bls_input_tensors_map[bls_name]
# Change the name to what the preprocessor expects
preproc_input_tensors.append(
pb_utils.Tensor(preproc_name, tensor.as_numpy()))
return preproc_input_tensors
def _get_trtllm_input_tensors(self, bls_input_tensors_map,
preproc_output_tensors):
trtllm_input_tensors = []
# Set input tensors from preprocessor outputs
for preproc_output_tensor in preproc_output_tensors:
trtllm_tensor_name = self.preproc_output_to_trtllm_input_map[
preproc_output_tensor.name()]
trtllm_input_tensors.append(
pb_utils.Tensor(trtllm_tensor_name,
preproc_output_tensor.as_numpy()))
# Set input tensors from bls inputs
for trtllm_name, bls_name in self.trtllm_input_to_bls_input_map.items(
):
if bls_name in bls_input_tensors_map:
tensor = bls_input_tensors_map[bls_name]
# Change the name to what the preprocessor expects
trtllm_input_tensors.append(
pb_utils.Tensor(trtllm_name, tensor.as_numpy()))
return trtllm_input_tensors
def _get_postproc_input_tensors(self, tokens, trtllm_output_tensors):
postproc_input_tensors = []
for trtllm_output_tensor in trtllm_output_tensors:
# If in decoupled mode, option to append new tokens to existing tokens before calling postprocessor
# This might be needed for some tokenizers
# Note that in that case, the client must overwrite previously received output text
if (self.accumulate_tokens and self.decoupled
and trtllm_output_tensor.name() == "output_ids"):
new_tokens = trtllm_output_tensor.as_numpy()
if new_tokens.ndim != 3:
raise pb_utils.TritonModelException(
"Expected output_ids tensor to have 3 dims.")
if new_tokens.shape[0] != 1:
raise pb_utils.TritonModelException(
"Expected output_ids tensor to have batch size of 1")
if new_tokens.shape[1] != 1:
raise pb_utils.TritonModelException(
"Accumulation of tokens is only implemented for beam width = 1"
)
tokens = new_tokens if (tokens is None) else np.concatenate(
(tokens, new_tokens), axis=2)
# output ids
postproc_output_ids_name = self.trtllm_output_to_postproc_input_map[
"output_ids"]
postproc_input_tensors.append(
pb_utils.Tensor(postproc_output_ids_name, tokens))
# sequence length
np_seq_len_tensor = np.array([[tokens.shape[2]]],
dtype=np.int32)
postproc_seq_len_name = self.trtllm_output_to_postproc_input_map[
"sequence_length"]
postproc_input_tensors.append(
pb_utils.Tensor(postproc_seq_len_name, np_seq_len_tensor))
# Set input tensors from trtllm outputs
for trtllm_output_tensor in trtllm_output_tensors:
# output_ids and sequence_length were handled earlier
if (self.accumulate_tokens and self.decoupled
and (trtllm_output_tensor.name() == "output_ids"
or trtllm_output_tensor.name() == "sequence_length")):
continue
postproc_tensor_name = self.trtllm_output_to_postproc_input_map[
trtllm_output_tensor.name()]
postproc_input_tensors.append(
pb_utils.Tensor(postproc_tensor_name,
trtllm_output_tensor.as_numpy()))
return tokens, postproc_input_tensors
def _get_bls_output_tensors(self, postproc_output_tensors):
bls_output_tensors = []
# Set input tensors from trtllm outputs
for postproc_output_tensor in postproc_output_tensors:
bls_tensor_name = self.postproc_output_to_bls_output_map[
postproc_output_tensor.name()]
bls_output_tensors.append(
pb_utils.Tensor(bls_tensor_name,
postproc_output_tensor.as_numpy()))
return bls_output_tensors
def execute(self, requests):
responses = []
bls_response_sender = None
for request in requests:
#Get the response sender for the BLS
if self.decoupled:
bls_response_sender = request.get_response_sender()
try:
# Get the bls input tensors
bls_input_tensors_map = self._get_bls_input_tensors_map(
request)
#Check the batch dimension
for name, tensor in bls_input_tensors_map.items():
batch_dim = tensor.as_numpy().shape[0]
print("Debug name {}, shape: {}", name, tensor.as_numpy().shape)
if batch_dim != 1:
err_str = "Inflight batching backend expects requests with batch size of 1."
self.logger.log_error(err_str)
raise pb_utils.TritonModelException(err_str)
# Create the preprocessor input tensors
preproc_input_tensors = self._get_preproc_input_tensors(
bls_input_tensors_map)
print(f"bls_input_tensors_map==={bls_input_tensors_map}====")
print(f"preproc_input_tensors:{preproc_input_tensors}=======")
preproc_request = pb_utils.InferenceRequest(
model_name="preprocessing",
inputs=preproc_input_tensors,
requested_output_names=list(
self.preproc_output_to_trtllm_input_map.keys()))
#Execute preprocessor
preproc_response = preproc_request.exec()
if preproc_response.has_error():
raise pb_utils.TritonModelException(
preproc_response.error().message())
# Create the trtllm input tensors
trtllm_input_tensors = self._get_trtllm_input_tensors(
bls_input_tensors_map, preproc_response.output_tensors())
trtllm_request = pb_utils.InferenceRequest(
model_name="tensorrt_llm",
inputs=trtllm_input_tensors,
requested_output_names=list(
self.trtllm_output_to_postproc_input_map.keys()))
#Execute trtllm
trtllm_responses = trtllm_request.exec(
decoupled=self.decoupled)
print(f"excute trtllm -get:{trtllm_responses}")
if not self.decoupled:
trtllm_responses = [trtllm_responses]
print(f"=============2=decoupled{self.decoupled}=========")
tokens = None
#Loop over the trtllm responses
for trtllm_response in trtllm_responses:
print(f"excute trtllm -every get:{trtllm_response}")
if trtllm_response.has_error():
raise pb_utils.TritonModelException(
trtllm_response.error().message())
trtllm_output_tensors = trtllm_response.output_tensors()
tokens, postproc_input_tensors = self._get_postproc_input_tensors(
tokens, trtllm_output_tensors)
print(f"tokens:==={tokens}")
postproc_request = pb_utils.InferenceRequest(
model_name="postprocessing",
inputs=postproc_input_tensors,
requested_output_names=list(
self.postproc_output_to_bls_output_map.keys()))
#Execute postprocessor
postproc_response = postproc_request.exec()
if postproc_response.has_error():
raise pb_utils.TritonModelException(
postproc_response.error().message())
# Create the BLS response
bls_output_tensors = self._get_bls_output_tensors(
postproc_response.output_tensors())
bls_response = pb_utils.InferenceResponse(
output_tensors=bls_output_tensors)
if self.decoupled:
print(f"==============3==decoupled{self.decoupled}==========")
bls_response_sender.send(bls_response)
else:
responses.append(bls_response)
# All responses have been sent, set final flag
if self.decoupled:
print(f"==============4==decoupled:{self.decoupled}==========")
print(f"==============4==:{pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL}==========")
bls_response_sender.send(
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
except Exception:
self.logger.log_error(traceback.format_exc())
# If encountering an error, send a response with err msg
error_response = pb_utils.InferenceResponse(
output_tensors=[],
error=pb_utils.TritonError(traceback.format_exc()))
if self.decoupled:
print(f"==============5==decoupled:{self.decoupled}==========")
bls_response_sender.send(error_response)
bls_response_sender.send(
flags=pb_utils.TRITONSERVER_RESPONSE_COMPLETE_FINAL)
else:
responses.append(error_response)
if self.decoupled:
return None
else:
assert len(responses) == len(requests)
return responses
def finalize(self):
"""`finalize` is called only once when the model is being unloaded.
Implementing `finalize` function is optional. This function allows
the model to perform any necessary clean ups before exit.
"""
print('Cleaning up...')
tensorrt-llm-bls config.pbtxt:
name: "tensorrt_llm_bls"
backend: "python"
max_batch_size: 4
model_transaction_policy { decoupled: true }
input [ { name: "text_input" data_type: TYPE_STRING dims: [ -1 ] }, { name: "max_tokens" data_type: TYPE_INT32 dims: [ -1 ] }, { name: "bad_words" data_type: TYPE_STRING dims: [ -1 ] optional: true }, { name: "stop_words" data_type: TYPE_STRING dims: [ -1 ] optional: true }, { name: "end_id" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "pad_id" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "top_k" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "top_p" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "temperature" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "length_penalty" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "repetition_penalty" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "min_length" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "presence_penalty" data_type: TYPE_FP32 dims: [ 1 ] optional: true }, { name: "random_seed" data_type: TYPE_UINT64 dims: [ 1 ] optional: true }, { name: "return_log_probs" data_type: TYPE_BOOL dims: [ 1 ] optional: true }, { name: "beam_width" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "stream" data_type: TYPE_BOOL dims: [ 1 ] optional: true }, { name: "prompt_embedding_table" data_type: TYPE_FP16 dims: [ -1, -1 ] optional: true }, { name: "prompt_vocab_size" data_type: TYPE_INT32 dims: [ 1 ] optional: true }, { name: "embedding_bias_words" data_type: TYPE_STRING dims: [ -1 ] optional: true }, { name: "embedding_bias_weights" data_type: TYPE_FP32 dims: [ -1 ] optional: true } ] output [ { name: "text_output" data_type: TYPE_STRING dims: [ -1 ] }, { name: "cum_log_probs" data_type: TYPE_FP32 dims: [ -1 ] }, { name: "output_log_probs" data_type: TYPE_FP32 dims: [ -1, -1 ] } ]
parameters: { key: "accumulate_tokens" value: { string_value: "true" } }
instance_group [ { count: 2 kind : KIND_CPU } ] `
Expected behavior
expect streaming output but not .
curl -X POST 10.110.31.16:8001/v2/models/tensorrt_llm_bls/generate_stream \ -d '{"text_input": "<|im_start|>system\n you are a writer .<|im_end|>\n<|im_start|>user\nwho are you<|im_end|>\n<|im_start|>assistant\n", "max_tokens": 54, "bad_words": "\n", "stop_words": "", "end_id": [151643], "pad_id": [151643],"stream": true }'
actual behavior
additional notes
when i print sth,
I think the trellm_response is a whole output could not Iteratorable