Node.js-Design-Patterns-Third-Edition icon indicating copy to clipboard operation
Node.js-Design-Patterns-Third-Edition copied to clipboard

Question about 06-Coding-with-Streams Multiplexing

Open luokuning opened this issue 4 years ago • 0 comments

Hi there, I'm reading multiplexing in Chapter 6, and I ran into a problem that I can't figure out. First I copied the contents of the client.js and server.js into my local file, but instead use file read/write stream rather than stdout or stderr, and the code runs smooth without any problem. The complete code is shown below:

// client.mjs
import { connect } from 'net'
import { createReadStream } from 'fs'

function multiplexChannels (sources, destination) {
  let openChannels = sources.length
  for (let i = 0; i < sources.length; i++) {
    sources[i]
    .on('readable', function () { // ①
      let chunk
      while ((chunk = this.read()) !== null) {
        const outBuff = Buffer.alloc(1 + 4 + chunk.length) // ②
        outBuff.writeUInt8(i, 0)
        outBuff.writeUInt32BE(chunk.length, 1)
        chunk.copy(outBuff, 5)
        console.log(`Sending packet to channel: ${i}`)
        destination.write(outBuff) // ③
      }
    })
    .on('end', () => { // ④
      if (--openChannels === 0) {
        destination.end()
      }
    })
  }
}

const socket = connect(3000, '127.0.0.1', () => {
+  const sourceFiles = [
+   createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs'),
+    createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs'),
+ ]

  multiplexChannels(sourceFiles, socket)

})

// server.mjs
import { createWriteStream } from 'fs'
import { createServer } from 'net'
import { resolve } from 'path'

function demultiplexChannel (source, destinations) {
  let currentChannel = null
  let currentLength = null

  source
    .on('readable', () => { // ①
      let chunk
      if (currentChannel === null) { // ②
        chunk = source.read(1)
        currentChannel = chunk && chunk.readUInt8(0)
      }

      if (currentLength === null) { // ③
        chunk = source.read(4)
        currentLength = chunk && chunk.readUInt32BE(0)
        if (currentLength === null) {
          return null
        }
      }

      chunk = source.read(currentLength) // ④
      if (chunk === null) {
        return null
      }

      console.log(`Received packet from: ${currentChannel}`)
      destinations[currentChannel].write(chunk) // ⑤
      currentChannel = null
      currentLength = null
    })
    .on('end', () => { // ⑥
      destinations.forEach(destination => destination.end())
      console.log('Source channel closed')
    })
}

const server = createServer(socket => {
+  const destFiles = [
+   createWriteStream('/tmp/filetcp/1.txt'),
+   createWriteStream('/tmp/filetcp/2.txt'),
+  ]

  demultiplexChannel(socket, destFiles)
})
server.listen(3000, () => console.log('Server started'))

But when I add highWaterMark option to file read stream of client.mjs like below:

-const sourceFiles = [
-    createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs'),
-    createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs'),
-  ]
+ const sourceFiles = [
+    createReadStream('/Users/cuishimin/Documents/space/ndp/client.mjs', { highWaterMark: 8 }),
+    createReadStream('/Users/cuishimin/Documents/space/ndp/server.mjs', { highWaterMark: 8 }),
+  ]

and run client.mjs again, something strange happened: both client and server socket hang forever. And the content of the server-generated file is incomplete, eg. 1.txt has partial content of client.mjs in this case.

I know that hightWaterMark is the maximum number of bytes of the internal buffer, but I can't figure out this has something to do with the problem, is there something that I missing?

luokuning avatar Dec 02 '21 10:12 luokuning