blog-frontend
blog-frontend copied to clipboard
使用SSE制作一个代码在线运行工具
使用Server Send Events制作一个代码在线运行工具
¿
最近尝试制作了一个能够在线运行代码的工具: Code-Runner, 踩坑不少, 做一个总结
涉及到的技术/模块如下:
-
服务端推送技术
Server Sent Events
, 下文简称SSE
-
Node.js
模块:-
child_process
模块 -
stream
模块
-
-
Docker
几条简单的命令-
docker run
-
docker pull
-
docker kill
-
-
Koa2
: 顺便用用, 不是核心
参考
1. SSE 用作服务端推送
严格地说,HTTP 协议无法做到服务器主动推送信息。但是,有一种变通方法, 就是服务器向客户端声明,接下来要发送的是流信息(streaming)。这时, 客户端 不会关闭连接, 而是会一直等着服务器发过来的新数据流。
1.1 SSE 和 WebSocket 对比
相对于WebSocket
, 它有如下的特点:
-
SSE
使用HTTP
协议;WebSocket
是一个独立的协议 -
SSE
使用简单, 轻量级;WebSocket
你肯定要引入socket.io
,ws
之类的库 -
SSE
默认支持断线重连;WebSocket
需要自己实现 -
SSE
一般用来传输文本;WebSocket
默认支持传送二进制数据
如果仅仅需要服务端推送这个功能的话, 使用SSE
的开发成本是最低的, 兼容性如下
1.2 SSE 最简单接入
假设接口地址为/sse
, 服务端代码(以Koa2
为例)为:
const { PassThrough } = require('stream')
router.get('/sse', ctx => {
const stream = new PassThrough()
setInterval(() => { stream.write(`: \n\n`) }, 5000)
ctx.set({ 'Content-Type':'text/event-stream' })
ctx.body = stream
})
客户端
const eventSource = new EventSource('/sse')
仅仅 10 行代码, 就完成了前后端的SSE
连接, 效果如下
1.3 SSE 事件流格式
事件流仅仅是一个简单的文本流数据,文本应该使用
UTF-8
格式的编码,每条消息后面都有一由一个空行作为分隔符 以冒号,以冒号开头的行为注释行,会被忽略。
注释行可以用来防止连接超时,服务器可以定期发送一条消息注释行,以保持连接不断
这样看不够直接,代码表述如下:
// 发送注释行
stream.write(`: \n\n`)
// 发送自定义事件test, 数据为字符串: this is a test message
stream.write(`event: test\ndata: this is a test message\n\n`)
// 发送自定义事件test1, 数据为一个对象: { msg: 'this is a test message' }
stream.write(`event: test1\ndata: ${JSON.stringify({ msg: 'this is a test message' })}\n\n`)
客户端监听自定义事件:
const eventSource = new EventSource('/sse')
eventSource.addEventListener('test', event => {
console.log(event.data) // this is a test message
})
没错, 就是这么简单
2. Code-Runner 具体实现
2.1 整体流程
- 客户端发出
GET /sse HTTP/1.1
请求
- 监听自定义事件
sse-connect
: 获得一个身份标识id - 监听自定义事件
sse-message
: 进度消息的推送, 例如镜像拉取、代码执行开始、代码执行结束 - 监听自定义事件
sse-result
: 代码执行结果的推送
- 用户提交代码
POST /runner HTTP/1.1
- 拉取镜像, 例如:
docker pull node:latest
- 将用户提交代码写入文件, 例如:
/code/main-1.js
- 启动容器, 例如:
docker run --rm --name runner-1 -v /code:/code node:latest node /code/main-1.js
- 根据身份标识id, 将结果写入对应的流
- 关闭容器
2.2 SSE的封装
封装目标:
-
可以根据身份标识
id
获得对应的流 -
对发送自定义事件的封装
-
对保持连接不断的封装
-
维护一个实例表, 便于向对应的流推送消息
const { PassThrough } = require('stream')
const instanceMap = new Map()
let uid = 0
/**
* Server Sent Events封装
*/
module.exports = class SSE {
/**
* 构造函数中初始化转换流、身份标识、执行初始化方法
*/
constructor(options = {}) {
this.stream = new PassThrough()
this.uid = ++uid
this.intervalTime = options.intervalTime || 5000
this._init()
}
/**
* 根据uid获取SSE实例
*/
static getInstance(uid) {
return instanceMap.get(+uid)
}
/**
* 根据uid发送自定义事件
*/
static writeStream(uid, event, data) {
const instance = this.getInstance(uid)
if (instance) instance.writeStream(event, data)
}
/**
* 初始化函数中记录当前实例, 并保持长连接
*/
_init() {
instanceMap.set(this.uid, this)
this._writeKeepAliveStream()
const timer = setInterval(() => { this._writeKeepAliveStream() }, this.intervalTime)
this.stream.on('close', () => {
clearInterval(timer)
instanceMap.delete(this.uid)
})
}
/**
* 通过发送注释消息保持长连接
*/
_writeKeepAliveStream() {
this.stream.write(': \n\n')
}
/**
* 发送自定义事件
*/
writeStream(event, data) {
const payload = typeof data === 'string' ? data : JSON.stringify(data)
this.stream.write(`event: ${event}\ndata: ${payload}\n\n`)
}
}
封装后, /sse
接口代码简化为:
router.get('/sse', ctx => {
ctx.set({
'Content-Type':'text/event-stream',
'Cache-Control':'no-cache',
'Connection': 'keep-alive'
})
const sse = new SSE()
sse.writeStream('sse-connect', sse.uid)
ctx.body = sse.stream
})
2.3 限制容器的使用时长
执行用户代码, 需要限制容器的使用时长, 虽然一直有Issue: 给docker run命令增加timeout选项,
但是最佳的停止容器运行的方式还是docker stop / docker kill
docker stop
: 用docker stop命令来停掉容器的时候,docker默认会允许容器中的应用程序有10秒的时间用以终止运行, 如果等待时间达到设定的超时时间,或者默认的10秒,会继续发送SIGKILL的系统信号强行kill掉进程
docker kill
: 默认情况下,docker kill命令不会给容器中的应用程序有任何gracefully shutdown的机会。 它会直接发出SIGKILL的系统信号,以强行终止容器中程序的运行
因此, 此处采用docker kill
更符合需求, 停止容器的代码如下:
/**
* 停止Docker容器
* @description
* exec方法中的timeout选项在执行“docker run”命令时无效, 因此采用“docker kill”命令来限制容器使用时长
* 通过docker kill, childProcess exitCode值为137
* @param {string} containerName 容器名称
* @param {number} timeout 限制使用时长
* @return {number} timer
*/
function stopDocker(containerName, timeout) {
return setTimeout(async () => {
try {
await exec(`docker kill ${containerName}`)
} catch (e) { }
}, timeout)
}
2.4 流的方式获得输出
child_process.exec
方法执行命令返回的结果是buffer/string
, 此处我们需要使用流的方式, 就要使用child_process.spawn
方法
容器启动的部分代码如下:
/**
* 启动Docker容器并使用流模式获取输出
* @param {object} dockerOptions 启动docker的配置
*/
function startDockerBySpawn(dockerOptions) {
// 获得容器名, 镜像名, 执行命令, 挂载卷
const { containerName, imageName, execCommand, volume } = dockerOptions
const commandStr = `docker run --rm --memory=50m --name ${containerName} -v ${volume} ${imageName} ${execCommand}`
const [command, ...args] = commandStr.split(' ')
// 启动容器
const childProcess = spawn(command, args)
//...
}
容器启动后, 可以获得两个流:
-
childProcess.stdout
-
childProcess.stderr
我们需要把两个流的数据组合起来, 然后将其转换为SSE
数据格式, 最终写入到目标流targetStream
, 代码如下:
const t = new SSETransform()
const transferStation = new PassThrough()
childProcess.stdout.pipe(transferStation)
childProcess.stderr.pipe(transferStation)
transferStation.pipe(t).pipe(targetStream, { end: false })
自定义的转换流如下:
const { Transform } = require('stream')
/**
* 自定义转换流
* @description
* 将child_process.stdout/stderr的可写流转换为EventStream的格式
*/
module.exports = class SSETransform extends Transform {
constructor(eventName) {
super()
this.eventName = eventName || 'sse-result'
}
_transform(chunk, encoding, callback) {
callback(null, `event: ${this.eventName}\ndata: ${JSON.stringify({ result: chunk.toString('utf8') })}\n\n`)
}
}
一次代码执行流程获得的数据如下图所示
至此, 就完成了一个代码在线运行工具的开发