ktransformers icon indicating copy to clipboard operation
ktransformers copied to clipboard

[Bug] 请教大家Compile with `TORCH_USE_CUDA_DSA` to enable device-side assertions 问题

Open YHCyhc20010606 opened this issue 1 month ago • 2 comments

检查清单

  • [x] 1. 我已经搜索过相关问题,但未能获得预期的帮助
  • [x] 2. 该问题在最新版本中尚未修复
  • [x] 3. 请注意,如果您提交的BUG相关 issue 缺少对应环境信息和最小可复现示例,我们将难以复现和定位问题,降低获得反馈的可能性
  • [x] 4. 如果您提出的不是bug而是问题,请在讨论区发起讨论 https://github.com/kvcache-ai/ktransformers/discussions。否则该 issue 将被关闭
  • [x] 5. 为方便社区交流,我将使用中文/英文或附上中文/英文翻译(如使用其他语言)。未附带翻译的非中文/英语内容可能会被关闭

问题描述

RuntimeError: CUDA error: device-side assert triggered CUDA kernel errors might be asynchronously reported at some other API call, so the stacktrace below might be incorrect. For debugging consider passing CUDA_LAUNCH_BLOCKING=1 Compile with TORCH_USE_CUDA_DSA to enable device-side assertions.

复现步骤

我在使用local_chat.py 改成读取jsonl文件 去批处理 大概在进行10~20个问题 就出现这个问题 请教大家怎么解决?

环境信息

A100 单卡 ubuntu 22.04 cpu Intel(R) Xeon(R) Gold 6336Y CPU @ 2.40GHz

YHCyhc20010606 avatar Nov 07 '25 08:11 YHCyhc20010606

我的local_chat.py 代码如下: """ Description : Batch processing version of local_chat Author : Boxin Zhang, Azure-Tang (Modified for batch processing) Version : 0.1.0 Copyright (c) 2024 by KVCache.AI, All Rights Reserved. """

import os import platform import sys import argparse

project_dir = os.path.dirname(os.path.dirname(file)) sys.path.insert(0, project_dir) import torch import logging from transformers import ( AutoTokenizer, AutoConfig, AutoModelForCausalLM, GenerationConfig, TextStreamer, ) import json import fire from ktransformers.operators.experts import KExpertsCache from ktransformers.optimize.optimize import optimize_and_load_gguf from ktransformers.models.modeling_deepseek import DeepseekV2ForCausalLM from ktransformers.models.modeling_qwen2_moe import Qwen2MoeForCausalLM from ktransformers.models.modeling_deepseek_v3 import DeepseekV3ForCausalLM from ktransformers.models.modeling_llama import LlamaForCausalLM from ktransformers.models.modeling_mixtral import MixtralForCausalLM from ktransformers.util.utils import prefill_and_generate, get_compute_capability from ktransformers.server.config.config import Config from ktransformers.operators.flashinfer_wrapper import flashinfer_enabled from ktransformers.util.vendors import device_manager, get_device, to_device, GPUVendor

custom_models = { "DeepseekV2ForCausalLM": DeepseekV2ForCausalLM, "DeepseekV3ForCausalLM": DeepseekV3ForCausalLM, "Qwen2MoeForCausalLM": Qwen2MoeForCausalLM, "LlamaForCausalLM": LlamaForCausalLM, "MixtralForCausalLM": MixtralForCausalLM, }

ktransformer_rules_dir = ( os.path.dirname(os.path.abspath(file)) + "/optimize/optimize_rules/" ) default_optimize_rules = { "DeepseekV2ForCausalLM": ktransformer_rules_dir + "DeepSeek-V2-Chat.yaml", "DeepseekV3ForCausalLM": ktransformer_rules_dir + "DeepSeek-V3-Chat.yaml", "Qwen2MoeForCausalLM": ktransformer_rules_dir + "Qwen2-57B-A14B-Instruct.yaml", "LlamaForCausalLM": ktransformer_rules_dir + "Internlm2_5-7b-Chat-1m.yaml", "MixtralForCausalLM": ktransformer_rules_dir + "Mixtral.yaml", }

def batch_process( model_path: str | None = None, optimize_config_path: str = None, gguf_path: str | None = None, max_new_tokens: int = 1000, cpu_infer: int = Config().cpu_infer, use_cuda_graph: bool = False, input_file: str | None = None, output_file: str | None = None, save_interval: int = 20, mode: str = "normal", force_think: bool = False, chunk_prefill_size: int = 8192, device: str = "cuda", load_size: int = None, prefetch_size: int = 0 ): """ Batch process prompts from a JSONL file and save results to another JSONL file.

Input JSONL format: {"prompt": "your prompt text", "id": "optional_id"}
Output JSONL format: {"prompt": "original prompt", "response": "generated response", "id": "optional_id"}
"""

# Validate batch processing arguments
if input_file is None:
    raise ValueError("--input_file is required for batch processing")
if output_file is None:
    raise ValueError("--output_file is required for batch processing")
if not os.path.exists(input_file):
    raise FileNotFoundError(f"Input file not found: {input_file}")

torch.set_grad_enabled(False)
Config().cpu_infer = cpu_infer

# Load tokenizer and config
print(f"Loading tokenizer and config from {model_path}")
tokenizer = AutoTokenizer.from_pretrained(model_path, trust_remote_code=True)
config = AutoConfig.from_pretrained(model_path, trust_remote_code=True)

if mode == 'long_context':
    assert config.architectures[0] == "LlamaForCausalLM", "only LlamaForCausalLM support long_context mode"
    torch.set_default_dtype(torch.float16)
else:
    torch.set_default_dtype(config.torch_dtype)

# Initialize model
print("Initializing model...")
with torch.device("meta"):
    if config.architectures[0] in custom_models:
        print("using custom modeling_xxx.py.")
        if "Qwen2Moe" in config.architectures[0]:
            config._attn_implementation = "flash_attention_2"
        if "Llama" in config.architectures[0]:
            config._attn_implementation = "eager"
        if "Mixtral" in config.architectures[0]:
            config._attn_implementation = "flash_attention_2"

        model = custom_models[config.architectures[0]](config)
    else:
        model = AutoModelForCausalLM.from_config(
            config, trust_remote_code=True, attn_implementation="flash_attention_2"
        )

# Get optimize config path
if optimize_config_path is None:
    if config.architectures[0] in default_optimize_rules:
        print("using default_optimize_rule for", config.architectures[0])
        optimize_config_path = default_optimize_rules[config.architectures[0]]
        print(f'{optimize_config_path=}')
    else:
        raise ValueError("optimize_config_path is required for this model architecture")

if gguf_path is None:
    raise ValueError("gguf_path is required")

if load_size is None:
    load_size = 1
    print(f"Using default load_size: {load_size}")

load_size = [load_size] * config.num_hidden_layers
config.load_size = load_size
config.prefetch_size = prefetch_size

print("Loading GGUF model...")
optimize_and_load_gguf(model, optimize_config_path, gguf_path, config)

try:
    model.generation_config = GenerationConfig.from_pretrained(model_path)
except Exception as e:
    print(f"generation config can't auto create, make default. Message: {e}")
    gen_config = GenerationConfig(
        temperature=0.6,
        top_p=0.95,
        do_sample=True
    )
    model.generation_config = gen_config

if model.generation_config.pad_token_id is None:
    model.generation_config.pad_token_id = model.generation_config.eos_token_id
model.eval()

logging.basicConfig(level=logging.INFO)
system = platform.system()

# Load existing results if output file exists (for resume)
processed_ids = set()
results = []
if os.path.exists(output_file):
    print(f"Found existing output file, loading processed results...")
    with open(output_file, 'r', encoding='utf-8') as f:
        for line in f:
            if line.strip():
                result = json.loads(line)
                results.append(result)
                if 'id' in result:
                    processed_ids.add(result['id'])
    print(f"Loaded {len(results)} existing results")

# Load input prompts
print(f"Loading prompts from {input_file}")
prompts = []
with open(input_file, 'r', encoding='utf-8') as f:
    for idx, line in enumerate(f):
        if line.strip():
            data = json.loads(line)
            # Skip if already processed
            item_id = data.get('id', idx)
            if item_id not in processed_ids:
                prompts.append({
                    'prompt': data.get('prompt', data.get('text', '')),
                    'id': item_id,
                    'original_data': data
                })

print(f"Total prompts to process: {len(prompts)}")

# Check if flashinfer should be used
use_flashinfer = (
    system != "Windows" and 
    (config.architectures[0] == "DeepseekV2ForCausalLM" or 
     config.architectures[0] == "DeepseekV3ForCausalLM") and 
    flashinfer_enabled and 
    get_compute_capability() >= 8 and 
    device_manager.gpu_vendor == GPUVendor.NVIDIA
)

# Process prompts
for idx, item in enumerate(prompts):
    try:
        prompt_text = item['prompt']
        prompt_id = item['id']
        
        print(f"\n{'='*60}")
        print(f"Processing [{idx+1}/{len(prompts)}] - ID: {prompt_id}")
        print(f"Prompt: {prompt_text[:100]}..." if len(prompt_text) > 100 else f"Prompt: {prompt_text}")
        
        # Prepare input
        messages = [{"role": "user", "content": prompt_text}]
        input_tensor = tokenizer.apply_chat_template(
            messages, add_generation_prompt=True, return_tensors="pt"
        )
        
        if force_think:
            token_thinks = torch.tensor(
                [tokenizer.encode("<think>\\n", add_special_tokens=False)], 
                device=input_tensor.device
            )
            input_tensor = torch.cat([input_tensor, token_thinks], dim=1)
        
        if mode == 'long_context':
            assert Config().long_context_config['max_seq_len'] > input_tensor.shape[1] + max_new_tokens, \
                "please change max_seq_len in ~/.ktransformers/config.yaml"

        # Generate response
        if use_flashinfer:
            generated = prefill_and_generate(
                model, tokenizer, input_tensor.cuda(), max_new_tokens, use_cuda_graph, 
                mode=mode, force_think=force_think, chunk_prefill_size=chunk_prefill_size,
                use_flashinfer_mla=True, 
                num_heads=config.num_attention_heads, 
                head_dim_ckv=config.kv_lora_rank, 
                head_dim_kpe=config.qk_rope_head_dim, 
                q_head_dim=config.qk_rope_head_dim + config.qk_nope_head_dim
            )
        else:
            generated = prefill_and_generate(
                model, tokenizer, input_tensor.cuda(), max_new_tokens, use_cuda_graph, 
                mode=mode, force_think=force_think, chunk_prefill_size=chunk_prefill_size
            )
        
        # The generated output is already printed by prefill_and_generate
        # We need to capture it. Let's decode the output
        output_ids = generated[0] if isinstance(generated, tuple) else generated
        response_text = tokenizer.decode(output_ids[input_tensor.shape[1]:], skip_special_tokens=True)
        
        # Save result
        result = {
            'id': prompt_id,
            'prompt': prompt_text,
            'response': response_text
        }
        # Include any additional fields from original data
        for key, value in item['original_data'].items():
            if key not in result:
                result[key] = value
        
        results.append(result)
        
        print(f"\nResponse: {response_text[:200]}..." if len(response_text) > 200 else f"\nResponse: {response_text}")
        
        # Save periodically
        if (idx + 1) % save_interval == 0:
            print(f"\n{'='*60}")
            print(f"Saving results at checkpoint {idx+1}/{len(prompts)}")
            with open(output_file, 'w', encoding='utf-8') as f:
                for r in results:
                    f.write(json.dumps(r, ensure_ascii=False) + '\n')
            print(f"Saved {len(results)} results to {output_file}")
    
    except Exception as e:
        print(f"\nError processing prompt ID {prompt_id}: {str(e)}")
        import traceback
        traceback.print_exc()
        # Save error result
        results.append({
            'id': prompt_id,
            'prompt': prompt_text,
            'response': None,
            'error': str(e)
        })
        continue

# Final save
print(f"\n{'='*60}")
print("Processing complete! Saving final results...")
with open(output_file, 'w', encoding='utf-8') as f:
    for r in results:
        f.write(json.dumps(r, ensure_ascii=False) + '\n')

print(f"All results saved to {output_file}")
print(f"Total processed: {len(results)}")

if name == "main": parser = argparse.ArgumentParser(description="Batch process prompts using KTransformers") parser.add_argument("--model_path", type=str, required=True, help="Path to the model") parser.add_argument("--optimize_config_path", type=str, default=None, help="Path to optimization config YAML") parser.add_argument("--gguf_path", type=str, required=True, help="Path to GGUF file") parser.add_argument("--max_new_tokens", type=int, default=1000, help="Maximum number of tokens to generate") parser.add_argument("--cpu_infer", type=int, default=Config().cpu_infer, help="CPU inference setting") parser.add_argument("--use_cuda_graph", action="store_true", default=False, help="Use CUDA graph") parser.add_argument("--input_file", type=str, required=True, help="Input JSONL file with prompts") parser.add_argument("--output_file", type=str, required=True, help="Output JSONL file for results") parser.add_argument("--save_interval", type=int, default=20, help="Save results every N prompts") parser.add_argument("--mode", type=str, default="normal", help="Generation mode") parser.add_argument("--force_think", action="store_true", default=False, help="Force thinking mode") parser.add_argument("--chunk_prefill_size", type=int, default=8192, help="Chunk prefill size") parser.add_argument("--device", type=str, default="cuda", help="Device to use") parser.add_argument("--load_size", type=int, default=None, help="Load size") parser.add_argument("--prefetch_size", type=int, default=0, help="Prefetch size")

args = parser.parse_args()

batch_process(
    model_path=args.model_path,
    optimize_config_path=args.optimize_config_path,
    gguf_path=args.gguf_path,
    max_new_tokens=args.max_new_tokens,
    cpu_infer=args.cpu_infer,
    use_cuda_graph=args.use_cuda_graph,
    input_file=args.input_file,
    output_file=args.output_file,
    save_interval=args.save_interval,
    mode=args.mode,
    force_think=args.force_think,
    chunk_prefill_size=args.chunk_prefill_size,
    device=args.device,
    load_size=args.load_size,
    prefetch_size=args.prefetch_size,
)

YHCyhc20010606 avatar Nov 07 '25 08:11 YHCyhc20010606

降低batch size试试,和显存有关

VRAM issue, try with lower batch size

TriDefender avatar Nov 13 '25 08:11 TriDefender