abbshr.github.io icon indicating copy to clipboard operation
abbshr.github.io copied to clipboard

node stream: the secret part

Open abbshr opened this issue 9 years ago • 0 comments

Unix Stream

流(stream)是Unix中的一个古老概念. 它的出现解决了I/O上的许多问题.

流是数据源的抽象, 内部有一个缓冲区, 所有输入/输出的数据都要经过这个内部缓冲区. 最简单的讲, 他的作用就是减少内存驻留数据速率控制.

要理解流的第一个作用, 需要清楚一点: 对于送来的数据, 我们有两种处理方式. 一种是直接送达目标(空目的地/socket/硬盘/显卡等外设), 另一种是经过某些处理(如解析器)之后再送达目标. 因此我们不会希望大量数据长久驻留在内存中.

回忆一下计算机网络里, 路由器的存储转发直通交换概念. 所谓存储转发, 就是每经过一跳都需要等待所有分片的数据报全部接收到(也就是需要经过重组). 而无需等待其他分片的就是采用直通交换技术了. 我认为这是对stream的轮廓最形象的刻画. 流在这里就是路由器的直通交换功能.

拿Node.js中的文件读写举例.

读取文件最简单的API是fs.readFile, 他会一次性把整个文件读入内存中. 如果文件特别大, 那么内存的开销也相当可观.

Node也提供了流式读取APIfs.createReadStream.

调用之后会返回一个可读流(readable stream)而不是数据, 你可以通过这个流一点一点抽取数据而不是把他们全部读入内存. 至于目标, 可以是网络socket, 也可以是协议解析器. 这样内存中就不会有过多数据驻留.

当你准备把文件上传到网络时, 系统调用把内存中的数据copy到网卡驱动接口函数. 而在网络状况不佳的时候, 物理链路上就会发生排队, 一旦文件系统层写入一大块数据, 可能直接填满链路缓冲区, 然后丢掉多余的数据或覆盖之前的数据.

流在这种情况下就派上用场了, 将原本一整块数据分成更小的几块, 缓存到内部缓冲区里, 再向底层写入. 因而也就有了改变传输速率的功能.

于是乎, 出现了基于流而设计的管道(pipe), 能够自动调控双方数据传输速率, 使消费速率较低的消费者不至于被过快产生的数据淹没(数据丢失).

流在Node.js中的实现

RocketEngine经过几次重构, 加深了我对流本质的理解.

先上一张图:

node_stream_digram

Node中的流分为Readable StreamWritable Stream.

可读流对象依赖于_readableState对象, 它有几个重要的属性:

_readableState: {
  highWaterMark: 16384,
  buffer: [],
  flowing: false,
  ended: false,
  endEmitted: false,
  reading: false,
  needReadable: false,
  emittedReadable: false,
  readableListening: false,
  objectMode: false
}

可读流有两种模式: flowing, non-flowing. 我在图中右下角直接标注了两种模式对应的事件和方法.
可读流默认处于non-flowing模式, 所以'data'事件默认是不会触发的, 因此即便不添加监听器也不会有数据流失.
而一旦调用了.resume(), 就开启了flowing模式, 这时如果不绑定data的回调函数,数据就跑掉. 当然, 可以在flowing模式中调用.pause(). 这并不会让流进入non-flowing模式, 他只是禁止data事件的触发, 并把数据缓存起来, 这样就不有丢失数据的现象出现了.

每个可读流都需要实现_read函数, 其作用是告知Node这个流从数据源获取数据的方式. push_read的具体动作, 其调用就在_read函数里发生, 将数据从源拉取到可读流的内部缓存中. 如果_read中没有再次调用pull, 那么流在下一次就不会调用_read了.

push()中断可读流的消费, 即reading属性从true变为false.

push(null)表示再没有数据可拉取了. 它的调用会使属性.ended变为true, 意味着这个流将关闭. 但如果此时内部缓冲区中仍有尚未消费的数据, 流将等到数据被完全消费后, 触发"end"事件, 同时endEmitted属性变为true.

对于可读流的消费者, read()是公开的API,

可写流对象依赖于_writableState对象:

_writableState: {
  highWaterMark: 16384,
  objectMode: false,
  needDrain: false,
  ending: false,
  ended: false,
  finished: false,
  writing: false,
  buffer: []
}

从图中可以看出, 可写流是可读流的"逆过程". 可写流需要实现_write函数, 把内部缓冲区的数据写入数据源. 给流的消费者暴露了write函数, 和_read是一个道理, 但有如下不同:

  • _read: 只要buffer属性缓存的数据大小严格小于highWaterMark, push的返回值就是true
  • write: 只要buffer属性里面的数据超过highWaterMark, write就返回false, 且needDrain属性变为true. 一旦buffer再次清空, 便会触发"drain"事件

highWaterMark=0也是有意义的阈值, 表示一旦有数据缓存write就返回false.

为何如此设计呢?

因为write在内部调用的是_write, 如果buffer中缓存了数据, 说明底层目标的写入速度比较慢, 返回false的作用就是告知上层数据源先别发数据了, 等到下面都写完("drain")后继续写.

比如TCP Socket中, 对write的说明是这样的:

Returns true if the entire data was flushed successfully to the kernel buffer. Returns false if all or part of the data was queued in user memory. 'drain' will be emitted when the buffer is again free.

而可读流就是需要把底层数据缓存到buffer中, 以便消费者消费, 但为了提醒流不要占用过多内存, push才会在缓冲大小达到highWaterMark时返回false.

'readable'事件何时触发?

这是个很有意思的问题. Node在V0.10中增加"readable"事件, 为流提供了non-flowing模式. 但并没有明确表示'readable'何时触发, 不过多数人应该清楚在readable事件回调里一定能得到期望的东西.

substack在他的"stream-handbook"里提到:

process.stdin.on('readable', function () { process.stdin.read(3); process.stdin.read(0) });

这就说明read(0)一定会再次触发标准输入流的readable事件码? 除此之外, push, _read, read都和"readable"有联系.

这个问题如何回答? 源码中清清楚楚解释了一切, SO

just go and read the fu*king source code :)

abbshr avatar Dec 29 '14 15:12 abbshr