sglang icon indicating copy to clipboard operation
sglang copied to clipboard

[Bug] update_weights_from_tensor raise EOFError when TP>1

Open thisjiang opened this issue 10 months ago • 2 comments

Checklist

  • [x] 1. I have searched related issues but cannot get the expected help.
  • [ ] 2. The bug has not been fixed in the latest version.
  • [x] 3. Please note that if the bug-related issue you submitted lacks corresponding environment info and a minimal reproducible demo, it will be challenging for us to reproduce and resolve the issue, reducing the likelihood of receiving feedback.
  • [x] 4. If the issue you raised is not a bug but a question, please raise a discussion at https://github.com/sgl-project/sglang/discussions/new/choose Otherwise, it will be closed.
  • [x] 5. Please use English, otherwise it will be closed.

Describe the bug

An EOFError error was raised when using update_weights_from_tensor at TP>4, it seens the data deserialize before the full data received.

Python error trace info:

Traceback (most recent call last):                                                                                                                        
  File "/usr/lib64/python3.9/multiprocessing/resource_sharer.py", line 143, in _serve                                                                     
    send, close = self._cache.pop(key)                                                                                                                    
KeyError: 1                                                                                                                                               
[2025-02-20 15:22:31 TP1] Scheduler hit an exception: Traceback (most recent call last):                                                                  
  File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/scheduler.py", line 1796, in run_scheduler_process                                     
    scheduler.event_loop_overlap()                                                                                                                        
  File "/usr/local/lib64/python3.9/site-packages/torch/utils/_contextlib.py", line 116, in decorate_context                                               
    return func(*args, **kwargs)                                                                                                                          
  File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/scheduler.py", line 494, in event_loop_overlap                                         
    self.process_input_requests(recv_reqs)                                                                                                                
  File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/scheduler.py", line 580, in process_input_requests                                     
    output = self._request_dispatcher(recv_req)                                                                                                           
  File "/usr/local/lib/python3.9/site-packages/sglang/utils.py", line 374, in __call__                                                                    
    return fn(obj)                                                                                                                                        
  File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/scheduler.py", line 1670, in update_weights_from_tensor                                
    success, message = self.tp_worker.update_weights_from_tensor(recv_req)                                                                                
  File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/tp_worker_overlap_thread.py", line 226, in update_weights_from_tensor                  
    success, message = self.worker.update_weights_from_tensor(recv_req)                                                                                   
  File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/tp_worker.py", line 208, in update_weights_from_tensor                                 
    MultiprocessingSerializer.deserialize(recv_req.serialized_named_tensors)                                                                              
  File "/usr/local/lib/python3.9/site-packages/sglang/srt/utils.py", line 1280, in deserialize                                                            
    return ForkingPickler.loads(data)                                                                                                                     
  File "/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/reductions.py", line 541, in rebuild_storage_fd                                    
    fd = df.detach()                                                                                                                                      
  File "/usr/lib64/python3.9/multiprocessing/resource_sharer.py", line 58, in detach                                                                      
    return reduction.recv_handle(conn)                                                                                                                    
  File "/usr/lib64/python3.9/multiprocessing/reduction.py", line 189, in recv_handle                                                                      
    return recvfds(s, 1)[0]                                                                                                                               
  File "/usr/lib64/python3.9/multiprocessing/reduction.py", line 159, in recvfds                                                                          
    raise EOFError                                                                                                                                        
EOFError

Reproduction

import os
import argparse
import math
import glob

import torch

from sglang.srt.server_args import ServerArgs
import sglang as sgl
from sglang.srt.model_loader.weight_utils import filter_duplicate_safetensors_files, safetensors_weights_iterator


def load_hf_weights(hf_folder):
    pattern = "*.safetensors"
    hf_weights_files = glob.glob(os.path.join(hf_folder, pattern))
    index_file = "model.safetensors.index.json"
    hf_weights_files = filter_duplicate_safetensors_files(hf_weights_files, hf_folder, index_file)
    weights_iterator = safetensors_weights_iterator(hf_weights_files)

    for name, param in weights_iterator:
        yield name, param


chief_ip='127.0.0.1'
host = '0.0.0.0'
port = 29000

model_name = 'Qwen2.5-7B-Instruct'
model_path='./Qwen2.5-7B-Instruct'
tp_size = 4

server_args = ServerArgs(
    model_path=model_path,
    dtype='bfloat16',
    tp_size=tp_size,
    mem_fraction_static=0.9,
    # request
    max_running_requests=max_batch_size,
    max_prefill_tokens=max_input_len,
    context_length=max_input_len+max_output_len,
    # serving
    host=host,
    port=int(port),
    device='cuda',
    served_model_name=model_name,
    log_level='info',
    trust_remote_code=True,
    log_requests=True,
    enable_metrics=True,
    show_time_cost=True,
    # Multi-node distributed serving
    dist_init_addr=f"{chief_ip}:{port}",
    nnodes = 1,
    node_rank = 0,
)
llm = sgl.Engine(server_args=server_args)

for name, param in load_hf_weights(model_path):
    llm.update_weights_from_tensor([(name, param)])

Environment

Verison: lastest v0.4.2 build from source GPU: NVIDIA H20 x4

thisjiang avatar Feb 20 '25 07:02 thisjiang

@jhinpan could you take a look and tell him how to use update weights from tensor? Give a doc to him and add a doc to test_update_weights_from_tensor.py like what we do in test_update_weights_from_distributed.py.

zhaochenyang20 avatar Feb 20 '25 19:02 zhaochenyang20

An EOFError error was raised when using update_weights_from_tensor at TP>4, it seens the data deserialize before the full data received.

tp_size = 4

So are u trying to use it at TP = 4 or TP > 1 or TP > 4? Please clarify it, thx!

jhinpan avatar Feb 20 '25 19:02 jhinpan

An EOFError error was raised when using update_weights_from_tensor at TP>4, it seens the data deserialize before the full data received.

tp_size = 4

So are u trying to use it at TP = 4 or TP > 1 or TP > 4? Please clarify it, thx!

The same error raise at TP=2.

I found the error can be fixed by replace ForkingPickler by dill at MultiprocessingSerializer :

class MultiprocessingSerializer:
    @staticmethod
    def serialize(obj):
        return dill.dumps(obj)

    @staticmethod
    def deserialize(data):
        return dill.loads(data)

thisjiang avatar Feb 24 '25 03:02 thisjiang

That's awesome! Would you mind raising a PR about fixing this issue?

jhinpan avatar Feb 24 '25 04:02 jhinpan

That's awesome! Would you mind raising a PR about fixing this issue?

Well, , I'm not sure about that. Although this change can fix the problem, it will reduce the weight transmission speed at TP=1. I think use dill is not a good idea.

thisjiang avatar Feb 24 '25 07:02 thisjiang

Yep. Could tom @fzyzcjy take a look plz?

zhaochenyang20 avatar Feb 24 '25 10:02 zhaochenyang20

The speedup was originally implemented in https://github.com/sgl-project/sglang/pull/2695, where the ForkingPickler is utilized to speedup by avoiding memory copying. If we use dill (assuming it really serializes the underlying bytes - not checked, is it true?), ~~it seems Verl's weight update (via EngineFragment) may be slow.~~ EDIT: Oops I thought about the old multi-process sglang modification for verl. For the new one (current one) there is no worries about this. But people using Engine.update_weights_from_tensors may still be slow.

Quick guess (not tested): It is possible that it is because, when tp>1, the tensor is serialized once (into bytes), and deserialized tp_size times. This violates what torch expects when it registers its functions into ForkingPickler.

If so, one quick fix may be, make UpdateWeightsFromTensorReqInput has data of type List[bytes], where the list has length tp_size. When making that object, we serialize the same tensor tp_size times. When deserializing in different tp workers, we pick the tp_rank-th bytes and deserialize it. Then it seems each serialized bytes is deserialized exactly once.

~~Another workaround, if we do not care about performance when using Engine (and only care about that when using EngineFragment), may be that, we pick the correct serialization protocol according to scenario. When using Engine and tp_size>1, we use dill (and be slow); otherwise, we use ForkingPickler.~~ (EDIT: Update given above)

fzyzcjy avatar Feb 24 '25 10:02 fzyzcjy

Oo, I think I found the real reason, my torch.tensor was placed at cpu instead of cuda. The error no raised after param.to(device=f"cuda:{torch.cuda.current_device()})"

thisjiang avatar Feb 24 '25 12:02 thisjiang