thread-stream icon indicating copy to clipboard operation
thread-stream copied to clipboard

Simpler algorithm

Open ronag opened this issue 2 years ago • 9 comments

Couldn't quite get thread-stream running well in our services so we re-implemented something similar. We used a simpler concurrent queue implementation that might be useful for thread-stream as well.

Basically we do it like this.

const MAX_LEN = 8 * 1024
const BUF_END = 256 * MAX_LEN
const BUF_LEN = BUF_END + MAX_LEN

const WRITE_INDEX = 4
const READ_INDEX = 8

const sharedBuffer = new SharedArrayBuffer(BUF_LEN)
const sharedState = new SharedArrayBuffer(128)

const state = new Int32Array(sharedState)
const buffer = Buffer.from(sharedBuffer)


buffer[0] = 31 // Item header
Atomics.store(state, WRITE_INDEX, 1)
Atomics.notify(state, WRITE_INDEX)

// producer

let draining = false
async function drain () {
  draining = true

  let write = Atomics.load(state, WRITE_INDEX)
  let read = Atomics.load(state, READ_INDEX)

  while (write <= read && write + MAX_LEN > read) {
    const { async, value } = Atomics.waitAsync(state, READ_INDEX, read)
    if (async) {
      await value
    }
    read = Atomics.load(state, READ_INDEX)
  }

  draining = false
  emit('drain')
}

send(data) {
  const len = MAX_LEN // or Buffer.byteLength(name)

  let read = Atomics.load(this._state, READ_INDEX)

  while (write < read && write + len > read) {
    Atomics.wait(this._state, READ_INDEX, read)
    read = Atomics.load(state, READ_INDEX)
  }

  write += this._buffer.write(name, this._write)
  buffer[write++] = 31

  if (write > BUF_END) {
   write = 0
  }

  Atomics.store(state, WRITE_INDEX, write)
  Atomics.notify(state, WRITE_INDEX)

  const needDrain = write + MAX_LEN >= BUF_END

  if (needDrain && !draining) {
    draining = true
    drain()
  }

  return !draining
}

// consumer
async function * receive () {
  while (true) {
    let write = Atomics.load(state, WRITE_INDEX)

    if (read > BUF_END) {
      read = 0
    }

    while (read === write) {
      const { async, value } = Atomics.waitAsync(state, WRITE_INDEX, write)
      if (async) {
        await value
      }
      write = Atomics.load(state, WRITE_INDEX)
    }

    if (write < read) {
      write = BUF_END
    }

    const arr = []
    while (read < write) {
      const idx = buffer.indexOf(31, read)
      arr.push(buffer.toString('utf-8', read, idx))
      read = idx + 1
    }

    Atomics.store(state, READ_INDEX, read)
    Atomics.notify(state, READ_INDEX)

    yield* arr
  }
}

ronag avatar Jan 17 '22 12:01 ronag