ktransformers icon indicating copy to clipboard operation
ktransformers copied to clipboard

[Bug]

Open abxis opened this issue 5 months ago • 0 comments

检查清单

  • [ ] 1. 我已经搜索过相关问题,但未能获得预期的帮助
  • [ ] 2. 该问题在最新版本中尚未修复
  • [ ] 3. 请注意,如果您提交的BUG相关 issue 缺少对应环境信息和最小可复现示例,我们将难以复现和定位问题,降低获得反馈的可能性
  • [ ] 4. 如果您提出的不是bug而是问题,请在讨论区发起讨论 https://github.com/kvcache-ai/ktransformers/discussions。否则该 issue 将被关闭
  • [ ] 5. 为方便社区交流,我将使用中文/英文或附上中文/英文翻译(如使用其他语言)。未附带翻译的非中文/英语内容可能会被关闭

问题描述

使用python ktransformers/server/main.py
--port 10002
--architectures Qwen3MoeForCausalLM
--model_path /root/model/qwen3/Qwen3-235B-A22B
--gguf_path /root/model/qwen3/Qwen3-235B-A22B-GGUF-Q4/Q4_K_M
--optimize_config_path ktransformers/optimize/optimize_rules/Qwen3Moe-serve.yaml
--max_new_tokens 1024
--cache_lens 32768
--chunk_size 256
--max_batch_size 4
--backend_type balance_serve
--force_think # useful for R1启动模型服务后,在openwebui里面创建如下函数:import os import json import requests import time from pydantic import BaseModel, Field from typing import List, Union, Iterator, Dict, Any

Set DEBUG to True to enable detailed logging

DEBUG = True

class Pipe: class Valves(BaseModel): openai_API_KEY: str = Field(default="none") # Optional API key if needed DEFAULT_MODEL: str = Field(default="DeepSeek-R1") # Default model identifier

def __init__(self):
    self.id = "DeepSeek-R1"
    self.type = "manifold"
    self.name = "DeepSeek-R1"
    self.valves = self.Valves(
        **{
            "openai_API_KEY": os.getenv("openai_API_KEY", "none"),
            "DEFAULT_MODEL": os.getenv("openai_DEFAULT_MODEL", "DeepSeek-R1"),
        }
    )
    # Self-hosted FastAPI server details
    self.api_url = (
        "http://172.17.0.1:10002/v1/chat/completions"  # 确保端口与您的服务匹配
    )
    self.headers = {"Content-Type": "application/json"}

def get_openai_models(self):
    """Return available models - for openai we'll return a fixed list"""
    return [{"id": "DeepSeek-R1", "name": "DeepSeek-R1"}]

def pipes(self) -> List[dict]:
    return self.get_openai_models()

def pipe(self, body: dict) -> Union[str, Iterator[str]]:
    request_id = f"req-{time.time_ns()}"  # 唯一请求ID
    if DEBUG:
        print(f"\n=== Starting Request {request_id} ===")
        print(f"Request Body: {json.dumps(body, indent=2)[:1000]}...")  # 截断长内容

    try:
        # Use default model ID since openai has a single endpoint
        model_id = self.valves.DEFAULT_MODEL

        messages = []

        # 正确保留多模态消息结构
        for message in body["messages"]:
            new_message = {"role": message["role"]}

            if isinstance(message.get("content"), list):
                # 保留多模态结构
                new_message["content"] = []
                for content in message["content"]:
                    if content["type"] == "text":
                        new_message["content"].append(
                            {"type": "text", "text": content["text"]}
                        )
                    elif content["type"] == "image_url":
                        # 直接传递图像URL
                        new_message["content"].append(
                            {
                                "type": "image_url",
                                "image_url": {"url": content["image_url"]["url"]},
                            }
                        )
            else:
                # 处理简单文本消息
                new_message["content"] = message["content"]

            messages.append(new_message)

        if DEBUG:
            print(f"Prepared messages for {request_id}:")
            print(json.dumps(messages, indent=2, ensure_ascii=False))

        # Prepare the API call parameters
        payload = {
            "model": model_id,
            "messages": messages,
            "temperature": body.get("temperature", 0.7),
            "top_p": body.get("top_p", 0.9),
            "max_tokens": body.get("max_tokens", 8192),
            "stream": body.get("stream", False),  # 默认为非流式
        }

        # Add stop sequences if provided
        if body.get("stop"):
            payload["stop"] = body["stop"]

        # Sending request to local FastAPI server
        if payload.get("stream", False):
            # Streaming response with proper termination
            def stream_generator():
                start_time = time.time()
                try:
                    response = requests.post(
                        self.api_url,
                        json=payload,
                        headers=self.headers,
                        stream=True,
                    )
                    response.raise_for_status()  # 检查HTTP错误

                    found_done = False
                    for line in response.iter_lines():
                        if line:
                            decoded_line = line.decode("utf-8")

                            # 关键修复:检测流式响应终止标记
                            if "[DONE]" in decoded_line:
                                if DEBUG:
                                    print(
                                        f"!!! RECEIVED DONE SIGNAL in {request_id} !!!"
                                    )
                                found_done = True
                                break

                            if DEBUG:
                                print(
                                    f"Stream response for {request_id}: {decoded_line[:200]}..."
                                )

                            # 检查OpenAI格式的结束标记
                            if '"finish_reason":"stop"' in decoded_line:
                                if DEBUG:
                                    print(
                                        f"Detected finish_reason:stop in {request_id}"
                                    )
                                yield decoded_line
                                found_done = True
                                break

                            yield decoded_line

                    # 确保流式响应正确终止
                    if not found_done:
                        if DEBUG:
                            print(
                                f"Warning: No DONE signal received for {request_id}"
                            )
                        yield "data: [DONE]\n\n"

                except Exception as e:
                    error_msg = f"Error during streaming: {str(e)}"
                    if DEBUG:
                        print(f"Stream error in {request_id}: {error_msg}")
                    # 生成符合OpenAI格式的错误响应
                    yield json.dumps(
                        {"error": {"message": error_msg, "type": "api_error"}}
                    )
                finally:
                    duration = time.time() - start_time
                    if DEBUG:
                        print(
                            f"=== Stream closed for {request_id} after {duration:.2f}s ==="
                        )

            return stream_generator()
        else:
            # Regular response
            start_time = time.time()
            response = requests.post(
                self.api_url, json=payload, headers=self.headers
            )
            response.raise_for_status()

            response_json = response.json()
            if DEBUG:
                print(f"Full response for {request_id}:")
                print(
                    json.dumps(response_json, indent=2, ensure_ascii=False)[:1000]
                    + "..."
                )
                print(
                    f"Request {request_id} completed in {time.time()-start_time:.2f}s"
                )

            return response_json

    except Exception as e:
        error_msg = f"Error in pipe method: {str(e)}"
        if DEBUG:
            print(f"Error in {request_id}: {error_msg}")
        # 返回错误响应
        return {"error": {"message": error_msg, "type": "api_error"}}
    finally:
        if DEBUG:
            print(f"=== Completed Request {request_id} ===\n")

def health_check(self) -> bool:
    """Check if the openai API (local FastAPI service) is accessible"""
    try:
        # 改为轻量级HEAD请求,避免生成实际请求
        response = requests.head(self.api_url, timeout=2)
        if DEBUG and response.status_code != 200:
            print(f"Health check failed with status: {response.status_code}")
        return response.status_code == 200
    except Exception as e:
        if DEBUG:
            print(f"Health check failed: {str(e)}")
        return False

,进行对话后,问题已经回答完了,但是后端任务收到二次请求,导致持续行推理,如图所示:

Image,是函数问题还是openwebui其他设置问题?

复现步骤

python ktransformers/server/main.py
--port 10002
--architectures Qwen3MoeForCausalLM
--model_path /root/model/qwen3/Qwen3-235B-A22B
--gguf_path /root/model/qwen3/Qwen3-235B-A22B-GGUF-Q4/Q4_K_M
--optimize_config_path ktransformers/optimize/optimize_rules/Qwen3Moe-serve.yaml
--max_new_tokens 1024
--cache_lens 32768
--chunk_size 256
--max_batch_size 4
--backend_type balance_serve
--force_think # useful for R1,用的Qwen3-235B-A22B-GGUF-Q4/Q4_K_M以及deepseek-r1-0528-q4-k_m模型

环境信息

ubuntu20.04,4卡A100

abxis avatar Jul 22 '25 14:07 abxis