抛弃难用的stream
什么是 stream
简单来说,将一个资源分成一个个小块(chunk)传输,而不是一次性传输所有数据,这种技术就可以被称为 stream。 数据源被称为 readable stream,接受数据方被称为 writeable stream,而在两者之间可以对数据做一些处理的中间步骤被称为 transform stream。 Stream 的优点就是不会对内存产生压力,并且可以让数据尽可能快地到达,而不必等待所有数据加载到内存。
难用的 stream
对我来说 stream 可能是 nodejs 中最难以掌握的部分,每次使用到 stream 时,都需要翻开文档查 api。另外,也很容易忘记处理错误,如当要从 readable stream pipe 到 writeable stream 时,要同时处理两个 stream 的错误,但往往会遗漏。
让我们看个例子,从一个 url 下载图片到本地
function download(url: string, filePath: string): Promise<void> {
return new Promise((resolve, reject) => {
const file = createWriteStream(filePath);
file.on('finish', resolve);
http.get(url, response => {
response.pipe(file);
});
});
}
Emm,好像忘了处理错误,让我们来加上:
function download(url: string, filePath: string): Promise<void> {
return new Promise((resolve, reject) => {
const file = createWriteStream(filePath);
const request = network.get(url, response => {
if (response.statusCode === 200) {
response.pipe(file);
} else {
file.close();
unlinkSync(filePath);
reject(`Server responded with ${response.statusCode}: ${response.statusMessage}`);
}
});
request.on('error', err => {
file.close();
unlinkSync(filePath);
reject(err.message);
});
file.on('finish', resolve);
file.on('error', (err: FileErr) => {
file.close();
if (err.code === 'EEXIST') {
reject('File already exists');
} else {
unlinkSync(filePath);
reject(err.message);
}
});
});
}
这下看起来不错了,已经补上了所有错误处理的逻辑。但是一个如此简单的需求我们却写了这么多代码,说明 stream 太难用了,需要我们手动处理太多逻辑。
那么,「难用」体现在哪呢? 第一,pipe 方法倾向于让用户使用链式调用,非常容易忘记写错误处理逻辑; 第二,http.get 回调参数 和 createWriteStream 都基于 stream,需要事无巨细地处理各种事件; 第三,逻辑被分割在各个回调里,无法清晰地阅读及调试。
新的工具
node 基础库里的 http 模块非常底层,所以 node 社区为 http client 出了一个又一个轮子,如axios、request、urllib等等。官方显然也是看到了这个问题,于是写了一个轮子Undici,同样基于 stream,但封装了很多细节,支持 promise,未来有望合并到 Node 基础库内。贴一段示例代码
import { request } from 'undici'
const {
statusCode,
headers,
body
} = await request('http://localhost:3000/foo')
console.log('response received', statusCode)
console.log('headers', headers)
for await (const data of body) {
console.log('data', data)
}
可以看到 api 设计已经和流行的 http client 比较接近了。
Pipe 方法也有了一个替代方案,在 Node 10 内新增了一个 pipeline 方法,注意是pipeline不是pipe。再贴一段官方示例:
const { pipeline } = require('stream');
const fs = require('fs');
const zlib = require('zlib');
pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz'),
(err) => {
if (err) {
console.error('Pipeline failed.', err);
} else {
console.log('Pipeline succeeded.');
}
}
);
抛弃了链式写法,更加清爽,强烈建议换用。并且还有 promise 版本:
const { pipeline } = require('stream/promises');
async function run() {
await pipeline(
fs.createReadStream('archive.tar'),
zlib.createGzip(),
fs.createWriteStream('archive.tar.gz')
);
console.log('Pipeline succeeded.');
}
run().catch(console.error);
重构
让我们使用上面的工具重构最初的需求
async function download(url: string, filePath: string): Promise<void> {
try {
const { body } = await request(url);
await pipeline(
body,
createWriteStream(filePath),
);
} catch (e) {
console.log('下载失败', url);
unlinkSync(filePath);
}
}
这个版本的代码,清晰易调试,不会忘记处理错误,你有什么理由不用呢~
总结
又做了一回标题党,我们其实并不需要抛弃 stream,反而应该大力拥抱,WHATWG已经制定了web streams 标准,node 也实现了这套标准,stream 也早已脱离 node,来到了浏览器。通过高层级封装的 api,我们可以更加方便地使用,或许2022年就将是 the year of web streams :)