midway
midway copied to clipboard
功能请求:希望标准项目可以实现流式响应
描述
我们希望在标准项目中实现流式响应的功能,以便满足特定应用场景的需求。流式响应允许服务器持续向客户端发送数据,而不需要客户端不断地进行轮询请求,这在某些实时通信和事件推送应用中非常有用。
背景
最近,我们的项目需要与大型语言模型进行交互,以获取实时生成的文本数据。这种交互需要使用 Server-Sent Events (SSE) 协议,其中 content-type 设置为 text/event-stream。在我们的项目中,我们选择使用 Koa 来实现 SSE,具体实现如下:
// 发送消息
const sendMessage = async (stream) => {
const data = [
'现在科学技术的发展速度叫人惊叹',
'同样在数码相机的技术创新上',
'随着数码相机越来越普及',
'数码相机现已成为大家生活中不可缺少的电子产品',
'而正是因为这样,技术的创新也显得尤为重要',
];
// 循环上面数组: 推送数据、休眠 2 秒
for (const value of data) {
stream.write(`data: ${value}\n\n`); // 写入数据(推送数据)
await new Promise((resolve) => setTimeout(resolve, 2000));
}
// 结束流
stream.end();
};
router.get('/demo', async (ctx) => {
// 1. 设置响应头
ctx.set({
'Connection': 'keep-alive',
'Cache-Control': 'no-cache',
'Content-Type': 'text/event-stream', // 表示返回数据是个 stream
});
// 2. 创建流、并作为接口数据进行返回
const stream = new PassThrough();
ctx.body = stream;
ctx.status = 200;
// 3. 推送流数据
sendMessage(stream, ctx);
});
请求
为了实现流式响应的功能,我们建议在标准项目中提供相应的支持和工具,以便开发人员可以更轻松地创建类似的 SSE 端点。这将使标准项目更加灵活,并能够满足更多实时应用的需求。
预期行为
我们期望标准项目中的流式响应功能包括以下要点:
- 能够设置响应头,包括必要的 SSE 头部信息(如 Connection、Cache-Control、Content-Type)。
- 能够创建一个可写流对象,作为响应的主体。
- 能够轻松地向流中写入数据,以实现数据的持续推送。
- 能够在需要时结束流,以通知客户端数据传输的结束。
这些功能将使开发人员能够更方便地实现 SSE 以及其他流式响应协议,从而增强项目的实时通信能力。
相关文档
(以上部分内容由AI协助生成)
直接引用 koa-sse 即可。
在Next.js中可以直接将ReadableStream
作为一个响应,我想在Midway中实现这样的功能好像不行
Next.js
// app/api/sse/route.ts
export async function GET() {
const encoder = new TextEncoder()
const stream = new ReadableStream({
async start(controller) {
const data = [
'渐进式设计',
'提供从基础到入门再到企业级的升级方案',
'解决应用维护与拓展性难题',
]
while (data.length) {
const chunk = `data: ${JSON.stringify({ content: data.shift() })}\n\n`
console.log(chunk)
controller.enqueue(encoder.encode(chunk))
await new Promise(resolve => setTimeout(resolve, 100))
}
controller.close()
},
})
return new Response(stream)
}
Response Raw
data: {"content":"渐进式设计"}
data: {"content":"提供从基础到入门再到企业级的升级方案"}
data: {"content":"解决应用维护与拓展性难题"}
Midway
// src/controller/sse.controller.ts
import { Controller, Get, Inject } from '@midwayjs/core'
import type { Context } from '@midwayjs/koa'
@Controller('/api')
export class APIController {
@Inject()
ctx: Context
@Get('/sse')
async createChatCompletion() {
const encoder = new TextEncoder()
const stream = new ReadableStream({
async start(controller) {
const data = [
'渐进式设计',
'提供从基础到入门再到企业级的升级方案',
'解决应用维护与拓展性难题',
]
while (data.length) {
const chunk = `data: ${JSON.stringify({ content: data.shift() })}\n\n`
console.log(chunk)
controller.enqueue(encoder.encode(chunk))
await new Promise(resolve => setTimeout(resolve, 100))
}
controller.close()
},
})
return stream
}
}
Response Raw
{}
Logs
[16:19:48] Node.js server restarted in 1468 ms
data: {"content":"渐进式设计"}
2023-09-25 16:20:02,889 INFO 3312 [-/::1/-/214ms GET /api/sse] Report in "src/middleware/report.middleware.ts", rt = 4ms
data: {"content":"提供从基础到入门再到企业级的升级方案"}
data: {"content":"解决应用维护与拓展性难题"}
我们用的 Readable 在 midway 中实现流式响应
const read = new Readable(); // node 内部 stream 模块
read._read = () => {};
read.push(streamData.content); // 异常调用返回一次数据
read.push(null); // 结束的时候 push 一个 null
return read; // 以 read 对象作为响应
我们用的 Readable 在 midway 中实现流式响应
const read = new Readable(); // node 内部 stream 模块 read._read = () => {}; read.push(streamData.content); // 异常调用返回一次数据 read.push(null); // 结束的时候 push 一个 null return read; // 以 read 对象作为响应
大兄弟,贴个完整代码参考下
标准项目一直都是可以流式响应的。
ctx.res.send(xxx) / ctx.res.end() 即可。
这个要具体看sse这个模块里面的逻辑了,你这么引用,所有的请求是都会过它的。
有完整案例看看吗,大佬们
+1,有无完整例子可以参考看看
我刚好写了一个。。。
@Get('/')
async home(): Promise<any> {
this.ctx.status = 200;
this.ctx.set('Transfer-Encoding', 'chunked');
for (let i = 0; i < 100; i++) {
await sleep(100);
this.ctx.res.write('abc'.repeat(100));
}
this.ctx.res.end();
}
这个也可以https://github.com/JarvisPrestidge/koa-event-stream.git
有一个方法可以实现。 我在尝试中安装了 koa-sse-stream 因为它不是标准的 midwayjs 组件,所以我在configuration.ts中直接配置无效 @Configuration({ imports: [ koa ]})
如果用 this.app.use(sse({ maxClients: 5000, pingInterval: 30000 })); 确实会生效,效果非常完美,但是我其他的路由都被影响了。
我绕了一圈找现成可用的方式,最后还是回到这个有过一点成果的尝试上。
这个插件封装的很简洁,示例写的其实很完整
参考插件代码,我实现了我要的效果
首先把sse.js 拿出来,当作一个服务文件
可以看到我几乎是复制过来的
然后写一个StreamMiddleware中间件 只有以/stream开头的路由会执行这个中间件
import { Middleware, IMiddleware } from '@midwayjs/core';
import { NextFunction, Context } from '@midwayjs/koa';
import { SSETransform } from '../service/sse.service';
// const Stream = require('stream');
const DEFAULT_OPTS = {
maxClients: 10000,
pingInterval: 60000,
closeEvent: 'close',
};
@Middleware()
export class StreamMiddleware implements IMiddleware<Context, NextFunction> {
resolve() {
return async (ctx: Context, next: NextFunction) => {
// 控制器前执行的逻辑
// const startTime = Date.now();
// 执行下一个 Web 中间件,最后执行到控制器
// 这里可以拿到下一个中间件或者控制器的返回值
if (ctx.res.headersSent) {
if (!(ctx.sse instanceof SSETransform)) {
console.error(
'SSE response header has been send, Unable to create the sse response'
);
}
return await next();
}
const sse = new SSETransform(ctx, DEFAULT_OPTS);
sse.on('close', () => {
console.log('close');
});
ctx.sse = sse;
await next();
if (ctx.sse) {
if (!ctx.body) {
ctx.body = ctx.sse;
} else {
if (!ctx.sse.ended) {
ctx.sse.send(ctx.body);
}
ctx.body = sse;
}
}
};
}
match(ctx) {
return ctx.path.indexOf('/stream/') !== -1;
}
}
然后在configuration.ts文件中应用中间件 import { StreamMiddleware } from './middleware/stream.middleware'; this.app.useMiddleware([StreamMiddleware, ReportMiddleware]);
最后
以上就是我实现的全部流程。
===============
捂脸 不用以上这么麻烦,我没仔细看api 只在路由上加载中间件就可以了
我是这样可以实现
@Get('/index')
async index(): Promise
这是来自QQ邮箱的自动回复邮件。您好,您的邮件已经收到,我会在一周内尽快给您回复!
我在用langgraph,一开始参照上面各位大佬的方案,直接写一个中间件调用 https://github.dev/JarvisPrestidge/koa-event-stream/ 的sse.ts 但是有问题,graph.streamEvents
接收的参数结束完了才会统一发event,流不起来。最后结合上面各位大佬的方法和上面的repo综合了一下,搞定了,希望能给后面遇到坑的小伙伴们参考。
stream.middlewate.ts
import { IMiddleware } from '@midwayjs/core';
import { Middleware } from '@midwayjs/decorator';
import { Context, NextFunction } from '@midwayjs/koa';
class SSE {
ctx: Context
constructor(ctx: Context) {
ctx.status = 200;
ctx.set('Content-Type', 'text/event-stream');
ctx.set('Cache-Control', 'no-cache');
ctx.set('Connection', 'keep-alive');
ctx.res.writeHead(200, {
'Content-Type': 'text/event-stream',
'Cache-Control': 'no-cache',
'Connection': 'keep-alive',
'Transfer-Encoding': 'chunked'
});
ctx.res.flushHeaders();
this.ctx = ctx;
}
send(data: any) {
// string
if (typeof data === "string") {
this.push(`data: ${data}\n\n`);
}
// object data
if (data.id) {
this.push(`id: ${data.id}\n`);
}
if (data.event) {
this.push(`event: ${data.event}\n`);
}
const text = typeof data.data === "object"
? JSON.stringify(data.data)
: data.data;
this.push(`data: ${text}\n\n`);
}
push(data: any) {
this.ctx.res.write(data);
this.ctx.res.flushHeaders();
}
close() {
this.ctx.res.end();
}
}
@Middleware()
export class StreamMiddleware implements IMiddleware<Context, NextFunction> {
resolve() {
return async (ctx: Context, next: NextFunction) => {
if (ctx.res.headersSent) {
if (!ctx.sse) {
console.error('[sse]: response headers already sent, unable to create sse stream');
}
return await next();
}
const sse = new SSE(ctx);
ctx.sse = sse;
await next();
if (!ctx.body) {
ctx.body = ctx.sse;
} else {
ctx.sse.send(ctx.body);
ctx.body = sse;
}
};
}
}
controller.ts
这里结合了一些我们业务的,按需把需要的东西挑出来用吧。
import { StreamMiddleware } from './middleware/stream.middleware';
import { HumanMessage } from "@langchain/core/messages";
import { Body, Controller, Inject, Post } from '@midwayjs/decorator';
import { ApiBody, ApiOperation, ApiTags } from '@midwayjs/swagger';
import 'dotenv/config';
import { IChatData } from './interface';
import { initGraph } from './langgraph/graph';
import { LangChainService } from './service';
@ApiTags(['langchain'])
@Controller('/api/ai/langchain', { middleware: [StreamMiddleware] })
export class LangChainServiceController {
@Inject()
service: LangChainService;
@Post('/chat')
@ApiOperation({
summary: '智能对话',
})
@ApiBody({})
async chat(@Body() data: IChatData) {
const graph = await initGraph();
const streamResults = graph.streamEvents(
{
messages: [
new HumanMessage({
content: '你是谁,你能干什么',
}),
],
},
{
recursionLimit: 10,
version: 'v2'
},
);
try {
for await (const output of streamResults) {
console.log("-----\n");
const result = JSON.stringify(output);
console.log(result);
this.ctx.sse.send(output)
console.log("-----\n");
}
console.log("----close----" + new Date().toLocaleTimeString());
this.ctx.sse.close(); // 结束连接
} catch (error) {
console.error('Stream processing error:', error);
this.ctx.sse.close(); // 出错时关闭SSE连接
}
}
}
按照 #3982 的设计,可能是
async chat(@Body() data: IChatData) {
const graph = await initGraph();
const sse = new ServerResponse().sse();
const streamResults = graph.streamEvents(
{
messages: [
new HumanMessage({
content: '你是谁,你能干什么',
}),
],
},
{
recursionLimit: 10,
version: 'v2'
},
);
try {
for await (const output of streamResults) {
sse.send(output)
}
sse.sendEnd();
} catch (error) {
console.error('Stream processing error:', error);
sse.sendEnd();
}
return sse;
}
@czy88840616 收到,期待pr合并到主版本后使用官方方案,谢谢大佬!