thread-stream
thread-stream copied to clipboard
Simpler algorithm
Couldn't quite get thread-stream running well in our services so we re-implemented something similar. We used a simpler concurrent queue implementation that might be useful for thread-stream as well.
Basically we do it like this.
const MAX_LEN = 8 * 1024
const BUF_END = 256 * MAX_LEN
const BUF_LEN = BUF_END + MAX_LEN
const WRITE_INDEX = 4
const READ_INDEX = 8
const sharedBuffer = new SharedArrayBuffer(BUF_LEN)
const sharedState = new SharedArrayBuffer(128)
const state = new Int32Array(sharedState)
const buffer = Buffer.from(sharedBuffer)
buffer[0] = 31 // Item header
Atomics.store(state, WRITE_INDEX, 1)
Atomics.notify(state, WRITE_INDEX)
// producer
let draining = false
async function drain () {
draining = true
let write = Atomics.load(state, WRITE_INDEX)
let read = Atomics.load(state, READ_INDEX)
while (write <= read && write + MAX_LEN > read) {
const { async, value } = Atomics.waitAsync(state, READ_INDEX, read)
if (async) {
await value
}
read = Atomics.load(state, READ_INDEX)
}
draining = false
emit('drain')
}
send(data) {
const len = MAX_LEN // or Buffer.byteLength(name)
let read = Atomics.load(this._state, READ_INDEX)
while (write < read && write + len > read) {
Atomics.wait(this._state, READ_INDEX, read)
read = Atomics.load(state, READ_INDEX)
}
write += this._buffer.write(name, this._write)
buffer[write++] = 31
if (write > BUF_END) {
write = 0
}
Atomics.store(state, WRITE_INDEX, write)
Atomics.notify(state, WRITE_INDEX)
const needDrain = write + MAX_LEN >= BUF_END
if (needDrain && !draining) {
draining = true
drain()
}
return !draining
}
// consumer
async function * receive () {
while (true) {
let write = Atomics.load(state, WRITE_INDEX)
if (read > BUF_END) {
read = 0
}
while (read === write) {
const { async, value } = Atomics.waitAsync(state, WRITE_INDEX, write)
if (async) {
await value
}
write = Atomics.load(state, WRITE_INDEX)
}
if (write < read) {
write = BUF_END
}
const arr = []
while (read < write) {
const idx = buffer.indexOf(31, read)
arr.push(buffer.toString('utf-8', read, idx))
read = idx + 1
}
Atomics.store(state, READ_INDEX, read)
Atomics.notify(state, READ_INDEX)
yield* arr
}
}