[Bug]
检查清单
- [ ] 1. 我已经搜索过相关问题,但未能获得预期的帮助
- [ ] 2. 该问题在最新版本中尚未修复
- [ ] 3. 请注意,如果您提交的BUG相关 issue 缺少对应环境信息和最小可复现示例,我们将难以复现和定位问题,降低获得反馈的可能性
- [ ] 4. 如果您提出的不是bug而是问题,请在讨论区发起讨论 https://github.com/kvcache-ai/ktransformers/discussions。否则该 issue 将被关闭
- [ ] 5. 为方便社区交流,我将使用中文/英文或附上中文/英文翻译(如使用其他语言)。未附带翻译的非中文/英语内容可能会被关闭
问题描述
使用python ktransformers/server/main.py
--port 10002
--architectures Qwen3MoeForCausalLM
--model_path /root/model/qwen3/Qwen3-235B-A22B
--gguf_path /root/model/qwen3/Qwen3-235B-A22B-GGUF-Q4/Q4_K_M
--optimize_config_path ktransformers/optimize/optimize_rules/Qwen3Moe-serve.yaml
--max_new_tokens 1024
--cache_lens 32768
--chunk_size 256
--max_batch_size 4
--backend_type balance_serve
--force_think # useful for R1启动模型服务后,在openwebui里面创建如下函数:import os
import json
import requests
import time
from pydantic import BaseModel, Field
from typing import List, Union, Iterator, Dict, Any
Set DEBUG to True to enable detailed logging
DEBUG = True
class Pipe: class Valves(BaseModel): openai_API_KEY: str = Field(default="none") # Optional API key if needed DEFAULT_MODEL: str = Field(default="DeepSeek-R1") # Default model identifier
def __init__(self):
self.id = "DeepSeek-R1"
self.type = "manifold"
self.name = "DeepSeek-R1"
self.valves = self.Valves(
**{
"openai_API_KEY": os.getenv("openai_API_KEY", "none"),
"DEFAULT_MODEL": os.getenv("openai_DEFAULT_MODEL", "DeepSeek-R1"),
}
)
# Self-hosted FastAPI server details
self.api_url = (
"http://172.17.0.1:10002/v1/chat/completions" # 确保端口与您的服务匹配
)
self.headers = {"Content-Type": "application/json"}
def get_openai_models(self):
"""Return available models - for openai we'll return a fixed list"""
return [{"id": "DeepSeek-R1", "name": "DeepSeek-R1"}]
def pipes(self) -> List[dict]:
return self.get_openai_models()
def pipe(self, body: dict) -> Union[str, Iterator[str]]:
request_id = f"req-{time.time_ns()}" # 唯一请求ID
if DEBUG:
print(f"\n=== Starting Request {request_id} ===")
print(f"Request Body: {json.dumps(body, indent=2)[:1000]}...") # 截断长内容
try:
# Use default model ID since openai has a single endpoint
model_id = self.valves.DEFAULT_MODEL
messages = []
# 正确保留多模态消息结构
for message in body["messages"]:
new_message = {"role": message["role"]}
if isinstance(message.get("content"), list):
# 保留多模态结构
new_message["content"] = []
for content in message["content"]:
if content["type"] == "text":
new_message["content"].append(
{"type": "text", "text": content["text"]}
)
elif content["type"] == "image_url":
# 直接传递图像URL
new_message["content"].append(
{
"type": "image_url",
"image_url": {"url": content["image_url"]["url"]},
}
)
else:
# 处理简单文本消息
new_message["content"] = message["content"]
messages.append(new_message)
if DEBUG:
print(f"Prepared messages for {request_id}:")
print(json.dumps(messages, indent=2, ensure_ascii=False))
# Prepare the API call parameters
payload = {
"model": model_id,
"messages": messages,
"temperature": body.get("temperature", 0.7),
"top_p": body.get("top_p", 0.9),
"max_tokens": body.get("max_tokens", 8192),
"stream": body.get("stream", False), # 默认为非流式
}
# Add stop sequences if provided
if body.get("stop"):
payload["stop"] = body["stop"]
# Sending request to local FastAPI server
if payload.get("stream", False):
# Streaming response with proper termination
def stream_generator():
start_time = time.time()
try:
response = requests.post(
self.api_url,
json=payload,
headers=self.headers,
stream=True,
)
response.raise_for_status() # 检查HTTP错误
found_done = False
for line in response.iter_lines():
if line:
decoded_line = line.decode("utf-8")
# 关键修复:检测流式响应终止标记
if "[DONE]" in decoded_line:
if DEBUG:
print(
f"!!! RECEIVED DONE SIGNAL in {request_id} !!!"
)
found_done = True
break
if DEBUG:
print(
f"Stream response for {request_id}: {decoded_line[:200]}..."
)
# 检查OpenAI格式的结束标记
if '"finish_reason":"stop"' in decoded_line:
if DEBUG:
print(
f"Detected finish_reason:stop in {request_id}"
)
yield decoded_line
found_done = True
break
yield decoded_line
# 确保流式响应正确终止
if not found_done:
if DEBUG:
print(
f"Warning: No DONE signal received for {request_id}"
)
yield "data: [DONE]\n\n"
except Exception as e:
error_msg = f"Error during streaming: {str(e)}"
if DEBUG:
print(f"Stream error in {request_id}: {error_msg}")
# 生成符合OpenAI格式的错误响应
yield json.dumps(
{"error": {"message": error_msg, "type": "api_error"}}
)
finally:
duration = time.time() - start_time
if DEBUG:
print(
f"=== Stream closed for {request_id} after {duration:.2f}s ==="
)
return stream_generator()
else:
# Regular response
start_time = time.time()
response = requests.post(
self.api_url, json=payload, headers=self.headers
)
response.raise_for_status()
response_json = response.json()
if DEBUG:
print(f"Full response for {request_id}:")
print(
json.dumps(response_json, indent=2, ensure_ascii=False)[:1000]
+ "..."
)
print(
f"Request {request_id} completed in {time.time()-start_time:.2f}s"
)
return response_json
except Exception as e:
error_msg = f"Error in pipe method: {str(e)}"
if DEBUG:
print(f"Error in {request_id}: {error_msg}")
# 返回错误响应
return {"error": {"message": error_msg, "type": "api_error"}}
finally:
if DEBUG:
print(f"=== Completed Request {request_id} ===\n")
def health_check(self) -> bool:
"""Check if the openai API (local FastAPI service) is accessible"""
try:
# 改为轻量级HEAD请求,避免生成实际请求
response = requests.head(self.api_url, timeout=2)
if DEBUG and response.status_code != 200:
print(f"Health check failed with status: {response.status_code}")
return response.status_code == 200
except Exception as e:
if DEBUG:
print(f"Health check failed: {str(e)}")
return False
,进行对话后,问题已经回答完了,但是后端任务收到二次请求,导致持续行推理,如图所示:
,是函数问题还是openwebui其他设置问题?
复现步骤
python ktransformers/server/main.py
--port 10002
--architectures Qwen3MoeForCausalLM
--model_path /root/model/qwen3/Qwen3-235B-A22B
--gguf_path /root/model/qwen3/Qwen3-235B-A22B-GGUF-Q4/Q4_K_M
--optimize_config_path ktransformers/optimize/optimize_rules/Qwen3Moe-serve.yaml
--max_new_tokens 1024
--cache_lens 32768
--chunk_size 256
--max_batch_size 4
--backend_type balance_serve
--force_think # useful for R1,用的Qwen3-235B-A22B-GGUF-Q4/Q4_K_M以及deepseek-r1-0528-q4-k_m模型
环境信息
ubuntu20.04,4卡A100