[Bug] update_weights_from_tensor raise EOFError when TP>1
Checklist
- [x] 1. I have searched related issues but cannot get the expected help.
- [ ] 2. The bug has not been fixed in the latest version.
- [x] 3. Please note that if the bug-related issue you submitted lacks corresponding environment info and a minimal reproducible demo, it will be challenging for us to reproduce and resolve the issue, reducing the likelihood of receiving feedback.
- [x] 4. If the issue you raised is not a bug but a question, please raise a discussion at https://github.com/sgl-project/sglang/discussions/new/choose Otherwise, it will be closed.
- [x] 5. Please use English, otherwise it will be closed.
Describe the bug
An EOFError error was raised when using update_weights_from_tensor at TP>4, it seens the data deserialize before the full data received.
Python error trace info:
Traceback (most recent call last):
File "/usr/lib64/python3.9/multiprocessing/resource_sharer.py", line 143, in _serve
send, close = self._cache.pop(key)
KeyError: 1
[2025-02-20 15:22:31 TP1] Scheduler hit an exception: Traceback (most recent call last):
File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/scheduler.py", line 1796, in run_scheduler_process
scheduler.event_loop_overlap()
File "/usr/local/lib64/python3.9/site-packages/torch/utils/_contextlib.py", line 116, in decorate_context
return func(*args, **kwargs)
File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/scheduler.py", line 494, in event_loop_overlap
self.process_input_requests(recv_reqs)
File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/scheduler.py", line 580, in process_input_requests
output = self._request_dispatcher(recv_req)
File "/usr/local/lib/python3.9/site-packages/sglang/utils.py", line 374, in __call__
return fn(obj)
File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/scheduler.py", line 1670, in update_weights_from_tensor
success, message = self.tp_worker.update_weights_from_tensor(recv_req)
File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/tp_worker_overlap_thread.py", line 226, in update_weights_from_tensor
success, message = self.worker.update_weights_from_tensor(recv_req)
File "/usr/local/lib/python3.9/site-packages/sglang/srt/managers/tp_worker.py", line 208, in update_weights_from_tensor
MultiprocessingSerializer.deserialize(recv_req.serialized_named_tensors)
File "/usr/local/lib/python3.9/site-packages/sglang/srt/utils.py", line 1280, in deserialize
return ForkingPickler.loads(data)
File "/usr/local/lib64/python3.9/site-packages/torch/multiprocessing/reductions.py", line 541, in rebuild_storage_fd
fd = df.detach()
File "/usr/lib64/python3.9/multiprocessing/resource_sharer.py", line 58, in detach
return reduction.recv_handle(conn)
File "/usr/lib64/python3.9/multiprocessing/reduction.py", line 189, in recv_handle
return recvfds(s, 1)[0]
File "/usr/lib64/python3.9/multiprocessing/reduction.py", line 159, in recvfds
raise EOFError
EOFError
Reproduction
import os
import argparse
import math
import glob
import torch
from sglang.srt.server_args import ServerArgs
import sglang as sgl
from sglang.srt.model_loader.weight_utils import filter_duplicate_safetensors_files, safetensors_weights_iterator
def load_hf_weights(hf_folder):
pattern = "*.safetensors"
hf_weights_files = glob.glob(os.path.join(hf_folder, pattern))
index_file = "model.safetensors.index.json"
hf_weights_files = filter_duplicate_safetensors_files(hf_weights_files, hf_folder, index_file)
weights_iterator = safetensors_weights_iterator(hf_weights_files)
for name, param in weights_iterator:
yield name, param
chief_ip='127.0.0.1'
host = '0.0.0.0'
port = 29000
model_name = 'Qwen2.5-7B-Instruct'
model_path='./Qwen2.5-7B-Instruct'
tp_size = 4
server_args = ServerArgs(
model_path=model_path,
dtype='bfloat16',
tp_size=tp_size,
mem_fraction_static=0.9,
# request
max_running_requests=max_batch_size,
max_prefill_tokens=max_input_len,
context_length=max_input_len+max_output_len,
# serving
host=host,
port=int(port),
device='cuda',
served_model_name=model_name,
log_level='info',
trust_remote_code=True,
log_requests=True,
enable_metrics=True,
show_time_cost=True,
# Multi-node distributed serving
dist_init_addr=f"{chief_ip}:{port}",
nnodes = 1,
node_rank = 0,
)
llm = sgl.Engine(server_args=server_args)
for name, param in load_hf_weights(model_path):
llm.update_weights_from_tensor([(name, param)])
Environment
Verison: lastest v0.4.2 build from source GPU: NVIDIA H20 x4
@jhinpan could you take a look and tell him how to use update weights from tensor? Give a doc to him and add a doc to test_update_weights_from_tensor.py like what we do in test_update_weights_from_distributed.py.
An EOFError error was raised when using update_weights_from_tensor at TP>4, it seens the data deserialize before the full data received.
tp_size = 4
So are u trying to use it at TP = 4 or TP > 1 or TP > 4? Please clarify it, thx!
An EOFError error was raised when using update_weights_from_tensor at TP>4, it seens the data deserialize before the full data received.
tp_size = 4So are u trying to use it at TP = 4 or TP > 1 or TP > 4? Please clarify it, thx!
The same error raise at TP=2.
I found the error can be fixed by replace ForkingPickler by dill at MultiprocessingSerializer :
class MultiprocessingSerializer:
@staticmethod
def serialize(obj):
return dill.dumps(obj)
@staticmethod
def deserialize(data):
return dill.loads(data)
That's awesome! Would you mind raising a PR about fixing this issue?
That's awesome! Would you mind raising a PR about fixing this issue?
Well, , I'm not sure about that. Although this change can fix the problem, it will reduce the weight transmission speed at TP=1. I think use dill is not a good idea.
Yep. Could tom @fzyzcjy take a look plz?
The speedup was originally implemented in https://github.com/sgl-project/sglang/pull/2695, where the ForkingPickler is utilized to speedup by avoiding memory copying. If we use dill (assuming it really serializes the underlying bytes - not checked, is it true?), ~~it seems Verl's weight update (via EngineFragment) may be slow.~~ EDIT: Oops I thought about the old multi-process sglang modification for verl. For the new one (current one) there is no worries about this. But people using Engine.update_weights_from_tensors may still be slow.
Quick guess (not tested): It is possible that it is because, when tp>1, the tensor is serialized once (into bytes), and deserialized tp_size times. This violates what torch expects when it registers its functions into ForkingPickler.
If so, one quick fix may be, make UpdateWeightsFromTensorReqInput has data of type List[bytes], where the list has length tp_size. When making that object, we serialize the same tensor tp_size times. When deserializing in different tp workers, we pick the tp_rank-th bytes and deserialize it. Then it seems each serialized bytes is deserialized exactly once.
~~Another workaround, if we do not care about performance when using Engine (and only care about that when using EngineFragment), may be that, we pick the correct serialization protocol according to scenario. When using Engine and tp_size>1, we use dill (and be slow); otherwise, we use ForkingPickler.~~ (EDIT: Update given above)
Oo, I think I found the real reason, my torch.tensor was placed at cpu instead of cuda. The error no raised after param.to(device=f"cuda:{torch.cuda.current_device()})"