blog
blog copied to clipboard
记一次解决 ERR_STREAM_WRITE_AFTER_END 问题的过程
今天用 Node.js 写了一段代码,但是却遇到了一个问题。
这段代码本身很简单,就是从网络上下载一个 zip 文件,然后读取 zip 中一个文件的内容,代码一共就 20 行:
const fetch = require('node-fetch')
const unzipper = require('unzipper')
;(async () => {
const response = await fetch('http://a.domain/some.zip')
if (response.ok) {
response.body.pipe(unzipper.Parse()).on('entry', (entry) => {
if (entry.path === '我想读取的文件.txt') {
let content = ''
entry.on('data', chunk => {
content += chunk
})
entry.on('end', () => {
console.log(content) // 打印文件内容
})
} else {
entry.autodrain()
}
})
}
})()
这段代码乍看之下没有什么问题,但一旦运行,就会报 ERR_STREAM_WRITE_AFTER_END
的错误,但即使报错了,文件的内容也能正常显示出来;我又多运行了几次,发现这个错误也不是每次都会报出来。
我的脑海里一下子有了很多问号:
- 为什么会报这个错?
- 为什么报错了也不影响我读取到文件的内容?
- 为什么这个错误没有每次都报?
- 怎么解决这个问题?
在翻了一圈源码之后,我总算弄明白了这些问题,于是写了这篇文章记录一下过程与心得。
代码里的两个流(Stream)
这段代码里有两个流:
-
response.body
:这是 node-fetch 这个库下载网络上的压缩包产生的流 -
unzipper.Parse()
:这是 unzipper 这个库用于解析压缩包产生的流
这两个流使用 pipe()
方法串在了一起,简单点说就是 node-fetch 一边下载压缩包,一边把压缩包的内容传给 unzipper 进行解析,而报错肯定是在这两个流里产生的。
我们先来看看 response.body
是什么流。
查看 node-fetch 的源码之后,我发现了这段代码:
// 来源:https://github.com/node-fetch/node-fetch/blob/v2.6.0/src/index.js#L184-L209
import Stream from 'stream'
import Response from './response'
const PassThrough = Stream.PassThrough
// 省略 ...
req.on('response', res => {
// 省略 ...
let body = res.pipe(new PassThrough())
// 省略 ...
const response = new Response(body, response_options);
// 省略 ...
})
// 省略 ...
在这段代码中,res
是 http.IncomingMessage 的实例,它是一个可读流(Readable Streams);PassThrough
是转换流(Transform Stream)的一个简单实现,而转换流又是继承自双工流(Duplex Stream)的……
作为基本没怎么深入研究过 Node.js 的前端工程师,第一次看到这些名词的时候也让我一头雾水,于是我读了一遍 Stream API 文档,对这些名词有了比较基本的理解。如果你已经对这些流比较熟悉了,也可以跳过下面这一小节。
Node.js 里的四种流
Node.js 里有两种最基本的流:可读流和可写流(Writable Streams)。我们可以用一个复制文件的代码来解释这两种流的作用:
const fs = require('fs')
function copyFile(path, newPath) {
const readable = fs.createReadStream(path)
const writable =fs.createWriteStream(newPath)
readable.pipe(writable)
}
上面这段代码实现了复制文件的功能,这里可能有人会问,用流的方式实现太绕了,我们可以直接用 readFile / writeFile:
const fs = require('fs')
function copyFile(path, newPath) {
// 为方便演示,这里用了 Sync
const fileBuffer = fs.readFileSync(path)
fs.writeFileSync(newPath, fileBuffer)
}
但这种实现方式有一个问题:内存很容易就会被撑满。readFile 会先把整个文件加载进内存,如果文件体积稍微大一点、或者同时读取的文件数量多一点,内存很快就会被耗尽;
而在使用流的方式中,我们创建了一个文件的可读流 readable
,这个流不会像 readFile 那样将文件整个加载进内存当中,而是会对文件进行多次读取,每次只读取一小部分,等这一部分被 writable
消费之后,再读下一部分。这样的话,内存占用就会小很多。
在对可读流和可写流有了基本的了解之后,双工流和转换流就比较好理解了。
双工流是既可以读、也可以写的流,在代码实现上,由于 JavaScript 不能同时继承两个类,所以它继承自可写流,但同时也实现了可读流的方法、属性和事件,这些细节可以从源码里看到。
而转换流是继承自双工流的,所以它也是既可读、也可写的,它比双工流特殊的地方在于,它输出的数据是跟输入的数据有关联的。举个例子,前端很常用的压缩 CSS 代码的工具就可以理解为一个转换流:它们接收 CSS 代码,然后输出压缩过后的 CSS。
PassThrough 是继承自转换流的一个简单实现,它的核心实现代码就这几行:
// 源码:https://github.com/nodejs/node/blob/v12.18.1/lib/_stream_passthrough.js
const Transform = require('_stream_transform');
// 继承转换流的属性和方法
ObjectSetPrototypeOf(PassThrough.prototype, Transform.prototype);
ObjectSetPrototypeOf(PassThrough, Transform);
function PassThrough(options) {
if (!(this instanceof PassThrough))
return new PassThrough(options);
Transform.call(this, options);
}
// 将输入的数据原封不动的传输下去
PassThrough.prototype._transform = function(chunk, encoding, cb) {
cb(null, chunk);
};
回到 response.body
之前我们提到,response.body
是基于 http.IncomingMessage 这个可读流创建的一个 PassThrough 流,PassThrough 只会把数据原封不动的传输下去,所以我们可以简单的将 response.body
当成一个可读流。
为什么这里要用 PassThrough 包装一层? 我查了一下,大致原因是在将可读流 pipe 到可写流中时,如果想要做一些额外的事件监听,可以用 PassThrough 来实现这个用于监听的流,省去了自己实现的一遍的麻烦,举个例子:
const { PassThrough } = require('stream')
const fs = require('fs')
const passThroughStream = new PassThrough()
// 先将数据输入的 PassThrough
fs.createReadStream('some.zip').pipe(passThroughStream)
// 然后将数据输入到 writable
passThroughStream.pipe(process.stdout)
// 在 PassThrough 上做一些额外的事件监听
passThroughStream.on('data', console.log)
搞清楚了 response.body
,再来看看 unzipper.Parse()
。
从源码里可以看到,unzipper.Parse
继承自 PullStream,而 PullSteam 继承自双工流。
至此我们可以得出结论:response.body
本质上是一个可读流,而 unzipper.Parse()
本质上是一个双工流。
问题出在哪个流上面?
在代码抛出 ERR_STREAM_WRITE_AFTER_END
这个错误的时候,我们很容易就可以找到抛错的代码是在可写流的源码里:
function writeAfterEnd(stream, cb) {
const er = new ERR_STREAM_WRITE_AFTER_END();
errorOrDestroy(stream, er);
process.nextTick(cb, er);
}
在我们这两个流中,只有 unzpiier.Parse()
是可写的,所以问题就出在它上面。至此我们可以初步得出一个结论:**在 response.body
往 unzipper.Parse()
传输数据的过程中,unzipper.Parse()
提前关闭了自己的可写状态,导致 Node.js 抛出了这个错误。**而查阅 Stream API 文档可以发现,如果要关闭可写流的可写状态,需要调用 writable.end()
方法,而在 unzipper.Parse()
的代码里,只有一个地方调用了 end
方法:
// 源码:https://github.com/ZJONSSON/node-unzipper/blob/v0.10.0/lib/parse.js#L258
Parse.prototype._readEndOfCentralDirectoryRecord = function() {
var self = this;
self.pull(18).then(function(data) {
var vars = binary.parse(data)
.word16lu('diskNumber')
.word16lu('diskStart')
.word16lu('numberOfRecordsOnDisk')
.word16lu('numberOfRecords')
.word32lu('sizeOfCentralDirectory')
.word32lu('offsetToStartOfCentralDirectory')
.word16lu('commentLength')
.vars;
self.pull(vars.commentLength).then(function(comment) {
comment = comment.toString('utf8');
self.end(); // <- 这里
self.push(null);
});
});
};
而 _readEndOfCentralDirectoryRecord
这个方法只出现在了 _readRecord
方法里:
// 源码:https://github.com/ZJONSSON/node-unzipper/blob/v0.10.0/lib/parse.js#L38-L67
Parse.prototype._readRecord = function () {
var self = this;
return self.pull(4).then(function(data) {
if (data.length === 0)
return;
var signature = data.readUInt32LE(0);
if (signature === 0x34327243) {
return self._readCrxHeader();
}
if (signature === 0x04034b50) {
return self._readFile();
}
else if (signature === 0x02014b50) {
self.__ended = true;
return self._readCentralDirectoryFileHeader();
}
else if (signature === 0x06054b50) {
return self._readEndOfCentralDirectoryRecord(); // <- 这里
}
else if (self.__ended) {
return self.pull(endDirectorySignature).then(function() {
return self._readEndOfCentralDirectoryRecord(); // <- 和这里
});
}
else
self.emit('error', new Error('invalid signature: 0x' + signature.toString(16)));
});
};
从代码上看,_readRecord
方法会检查 zip 文件的一些“分隔符”,比如 signature === 0x06054b50
可能就说明已经解析到 zip 文件的末尾了,它就会调用 _readEndOfCentralDirectoryRecord
方法关闭自己的写入流,阻止数据继续输入进来。但在这之后,response.body
还是在给它输入数据,于是就报出了文章开头的错误。
但如果这种情况成立,那说明 zip 文件的“末尾”并不是 response.body
这个可读流的末尾,也就是说,zip 文件都已经解析完了,response.body
还是在继续传输数据?
为了验证这个猜想,我把代码改造了一下:
const fetch = require('node-fetch')
const unzipper = require('unzipper')
;(async () => {
const response = await fetch('http://a.domain/some.zip')
console.log('响应体的大小:', response.headers.get('content-length'))
if (response.ok) {
const duplex = response.body.pipe(unzipper.Parse()).on('entry', (entry) => {
if (entry.path === '我想读取的文件.txt') {
let content = ''
entry.on('data', chunk => {
content += chunk
})
entry.on('end', () => {
console.log(content) // 打印文件内容
})
} else {
entry.autodrain()
}
})
let len = 0
const originWrite = duplex.write
duplex.write = function(buffer) {
console.log('接收到了' + buffer.length + ';目前一共接收到了:', len += buffer.length)
originWrite.apply(this, arguments)
}
const originEnd = duplex.end
duplex.end = function() {
console.log('调用了 end')
originEnd.apply(this, arguments)
}
}
})()
再运行一次,控制台输出如下:
$ node index
响应体的大小:8192
接收到了 1195;目前一共接收到了:1195
接收到了4320;目前一共接收到了: 5515
这里是文件的内容
调用了 end
接收到了1440;目前一共接收到了: 6955
Error [ERR_STREAM_WRITE_AFTER_END]: write after end
从控制台的消息来看,当我们接收到 5515 这个长度的时候,duplex 就调用了 end,提前关闭了自己的可写状态,但是响应体其实一共有 8192,所以 response.body
在这之后仍然在继续推送数据,于是产生了错误。
那么为什么偶尔又不会报错?
我又运行了几次,得到了没有报错时的控制台输出:
$ node index
响应体的大小: 8192
接收到了2635;目前一共接收到了: 2635
接收到了5557;目前一共接收到了: 8192
调用了 end
这里是文件内容
调用了 end <- 这里的 end 是 response.body 传输完数据后,unzipper.Parse() 自动触发的
这下我们就有最终结论了:在这个长度为 8192 的压缩包响应体 response.body
中,实际上在 5515 的时候,unzipper.Parse()
对压缩包的解析就结束了,所以 response.bdoy
后续给 unzipper.Parse()
输入数据就会报错;但如果 response.body
在最后一次输入数据时正好包含了 5515 之后的全部数据,那么响应体的数据输入和压缩包的解析同时结束了,所以就不会报错了。
怎么解决这个问题?
问题的根源是找到了,针对性的解决办法也有很多种,比如给上面的 write
方法做个改造:
const originWrite = duplex.write
duplex.write = function() {
if (!this.writableEnded) originWrite.apply(this, arguments)
}
但这样的方法不够优雅。我也想过给 unzipper 提个 PR,把这段判断直接写进它 PullStream 的实现里,但转念一想,给不可写的流输入数据的预期行为就是应该报错的,单纯的绕开这个错误会相当于在代码里埋雷。
之后,我找到了另一个解析 zip 文件的库 yauzl,它没有提供 unzipper 这种 Streaming API,并且做出了解释,大致意思是说,判断一个 zip 文件是否解析完了的“分隔符”是在 zip 文件的末尾,所以即使用了 Stream,还是需要先把 zip 文件全部加载进内存之后才能开始解析。用 Stream API 还会带来其他的一些问题,我这里就不赘述了。
所以,我最终的解决方案是:用 yauzl 替代 unzipper。
总结
通过排查这个问题,我对 Node.js 的 Stream 有了更加实际的了解,而不是仅仅停留在阅读一遍文档之后的似懂非懂之中。我同时也了解到,Stream API 并非解决所有问题的万能药,还是要看具体的场景。
一直在流中遨游,从未上岸