pro-chat
pro-chat copied to clipboard
🧐[问题] 接收时chunk 出现粘包或半包
🧐 问题描述
接收时chunk出现粘包或半包怎么处理
粘包:chunk data: {"event":"result","content":"要准确理解"} data: {"event":"result","content":"这个问题"} 半包:chunk data: {"event":"result
💻 示例代码
const theme = useTheme();
return (
<div style={{ background: theme.colorBgLayout }}>
<ProChat
request={async (messages: any) => {
// 正常业务中如下:
const response = await fetch('url', {
method: 'POST',
headers: {
'Content-Type': 'application/json;charset=UTF-8',
},
body: JSON.stringify({
query: messages[messages.length-1].content,
stream: true,
}),
});
// 确保服务器响应是成功的
if (!response.ok || !response.body) {
throw new Error(`HTTP error! status: ${response.status}`);
}
// 获取 reader
const reader = response.body.getReader();
const decoder = new TextDecoder('utf-8');
const encoder = new TextEncoder();
const readableStream = new ReadableStream({
async start(controller) {
function push() {
reader
.read()
.then(({ done, value }) => {
if (done) {
controller.close();
return;
}
const chunk = decoder.decode(value, { stream: true });
console.log('chunk', chunk);
const chunkSplit = chunk.split('\n\n');
for (let i = 0; i < chunkSplit.length; i++) {
const chunk = chunkSplit[i];
if(chunk){
const message = chunk.replace('data: ', '');
const parsed = JSON.parse(message);
console.log(parsed)
const event = parsed.event
if(event==='done'){
controller.close();
return;
}
const content = parsed.content
console.log(content)
controller.enqueue(encoder.encode(content));
}
}
push();
})
.catch((err) => {
console.error('读取流中的数据时发生错误', err);
controller.error(err);
});
}
push();
},
});
return new Response(readableStream);
}}
/>
</div>
);
};
🚑 其他信息
一般是让后台处理的,python 的话应该可以用 socket 解决?
一般是让后台处理的,python 的话应该可以用 socket 解决?
看着有些解决方案是:消息头中包含消息体的长度信息,接收方(前端)根据消息头中的长度信息来正确解析出消息体的内容。这样来解决粘包和流式半包的问题。这样是不是前端要用一个buffer缓冲流先存储一下,看接收完了再输出。
一般是让后台处理的,python 的话应该可以用 socket 解决?
看着有些解决方案是:消息头中包含消息体的长度信息,接收方(前端)根据消息头中的长度信息来正确解析出消息体的内容。这样来解决粘包和流式半包的问题。这样是不是前端要用一个buffer缓冲流先存储一下,看接收完了再输出。
逻辑上是可以的,但是比较麻烦,到时候判定规则或者前后拼接的逻辑得自己写好就也可以
先关闭了,看起来已经又一些解法了
用Buffer解决的粘包,我这边可以正常处理: const readableStream = new ReadableStream({ async start(controller) { let buffer = ''; // 缓冲区用于存储未处理完的消息
async function push() {
try {
// 拉取响应流中的标志位done与二进制的数据Value
const { done, value } = await reader.read();
console.log("VALUE",value);
if (done) {
if (buffer.length > 0) { // 处理剩余的缓冲区内容
handleChunk(buffer, controller, encoder);
}
controller.close(); // 关闭流
return;
}
buffer += decoder.decode(value, { stream: true });
let boundary = buffer.indexOf('\n'); // 查找缓冲区中的换行符
//循环查找换查找缓冲区的换行符
while (boundary !== -1) {
// 提取一份完整的消息
const completeMessage = buffer.slice(0, boundary).trim();
buffer = buffer.slice(boundary + 1); // 更新缓冲区
// 处理以'data: '开头的消息
if (completeMessage.startsWith('data: ')) {
let completeMessageTemp = completeMessage.replace('data: ', '').trim();
if (completeMessageTemp === '[DONE]') {
controller.close();
return;
}
console.log('This is Message', completeMessageTemp);
handleChunk(completeMessageTemp, controller, encoder);
}
boundary = buffer.indexOf('\n'); // 查找下一个换行符
}
push();
} catch (err) {
console.error('读取流中的数据时发生错误', err);
controller.error(err);
}
}
push();
}
});
// 处理消息块
function handleChunk(chunk: string, controller: ReadableStreamDefaultController<any>, encoder: TextEncoder) {
try {
if (!chunk) {
throw new Error("No valid JSON found in the message");
}
const parsed = JSON.parse(chunk);
if (parsed.choices && parsed.choices[0] && parsed.choices[0].delta) {
controller.enqueue(encoder.encode(parsed.choices[0].delta.content));
}
} catch (err) {
console.error('处理chunk时发生错误', err);
controller.error(err);
}
}