bun
bun copied to clipboard
`fs.createReadStream` and `fs.createWriteStream` from `fd` behaviour differs from Node
What version of Bun is running?
1.1.0+5903a6141
What platform is your computer?
Darwin 21.6.0 arm64 arm
What steps can reproduce the bug?
const fs = require('fs');
const path = require('path');
const os = require('os');
const tempFile = path.join(os.tmpdir(), 'temp.txt');
fs.open(tempFile, 'w+', (err, fd) => {
if (err) {
console.error('Error opening file:', err);
return;
}
const writeStream = fs.createWriteStream('', { fd: fd });
const readStream = fs.createReadStream('', { fd: fd, start: 0 });
readStream.on('data', (chunk) => {
console.log('Got chunk', chunk.toString())
});
const data = 'Hello, World!';
writeStream.write(data, (err) => {
if (err) {
console.error('Error writing to file:', err);
writeStream.close(() => { });
return;
}
console.log('Data written to file.');
});
});
What is the expected behavior?
~/projects/bun-fd-repro
❯ node test.js
Data written to file.
Got chunk Hello, World!
What do you see instead?
~/projects/bun-fd-repro
❯ bun test.js
Data written to file.
Additional information
Originally ran into this bug bumping Bun while testing https://github.com/replit/ruspty
I suspect we are closing the file descriptor instead of cloning the file descriptor, but I'm not sure yet.
So was messing around with the 1brc (read 1 billion lines from a 13,7 GB text file and process the data into the expected output) challenge in js.
Because of this i was messing a round with streams and therefor used the fs.createReadStream([path][, options])
. The thing is when i set the highWaterMark
option, which is supposed set the Buffer size that the callback of the 'data' event of the steam should recive every time it's triggert, it doesn't.
So my code is setup up as follows,
// !! The complete code is at the bottom of this comment !! //
const fs = require('node:fs');
const fp = 'D:\\code\\challenges\\1brc-local\\data\\measurements.txt';
const stream = fs.createReadStream(fp, { highWaterMark: 7340032 });
stream.on("data", (chunk) => {});
The 7340032 should set the size of expected buffer to ~7MB. Instead i recieve a buffer with a size of 78879 bytes ~77KB.
console output with bun:
console output with node:
complete code:
const fs = require('node:fs');
const fp = 'D:\\code\\challenges\\1brc-local\\data\\measurements.txt';
class Vec4 {
constructor(x, y, z, w) {
this._x = x;
this._y = y;
this._z = z;
this._w = w;
}
get x() {
return this._x;
}
get y() {
return this._y;
}
get z() {
return this._z;
}
get w() {
return this._w;
}
get xy() {
return this.x, this.y;
}
get xz() {
return this.x, this.z;
}
get xw() {
return this.x, this.w;
}
get yz() {
return this.y, this.z;
}
get yw() {
return this.y, this.w;
}
get zw() {
return this.z, this.w;
}
get xyz() {
return this.x, this.y, this.z;
}
get xyw() {
return this.x, this.y, this.w;
}
get xzw() {
return this.x, this.z, this.w;
}
get yzw() {
return this.y, this.z, this.w;
}
get xyzw() {
return this.x, this.y, this.z, this.w;
}
set x(value) {
this._x = value;
}
set y(value) {
this._y = value;
}
set z(value) {
this._z = value;
}
set w(value) {
this._w = value;
}
}
const executions = new Map();
function clock(fn, ...args) {
const fnn = fn.name;
if (!executions.has(fnn)) {
executions.set(fnn, { count: 0, total: 0 })
}
const meta = executions.get(fnn);
const s = performance.now();
const r = fn(...args);
const f = performance.now();
const μs = Math.floor((f - s) * 1000);
meta.count++;
meta.total += μs;
console.log(`${fn.name}() | executed in ${μs}μs | average: ${Math.floor(meta.total / meta.count)}μs`);
return r;
}
const stations = new Map();
function create(name, temp) {
stations.set(name, new Vec4(1, temp * 1000, temp, temp));
}
async function update(name, temp) {
const station = stations.get(name);
station.x = station.x + 1;
station.y = station.y + (temp * 1000);
if (temp < station.z) {
station.z = temp;
return;
}
if (temp > station.w) {
station.w = temp;
return;
}
}
let start = performance.now();
const stream = fs.createReadStream(fp, { highWaterMark: 7340032 });
const store = [];
/**
* @param {Buffer} val
*/
async function enqueue(val) {
return store.push(val) - 1;
}
let last = 0
/**
* @returns {Buffer}
*/
function dequeue() {
const chunk = store[last];
return [chunk, last++];
}
let totalBytes = 0;
let requiredBytes = 14_795_138_127;
async function report(id, ps) {
const i = await id;
const pe = performance.now();
const total = Math.floor((pe - start) / 1000);
const minutes = Math.floor(total / 60);
const seconds = total % 60;
console.log(i, 'processed chunk took:', Math.floor(pe - ps), 'ms; total:', minutes, 'm', seconds, 's; progress:', Math.floor((totalBytes / requiredBytes) * 100), '% (', totalBytes, '/', requiredBytes, ' | left:', requiredBytes - totalBytes, ')');
}
async function process(id) {
const ps = performance.now();
let [chunk, next] = dequeue();
let buff;
for (let i = 0; i < chunk.length; i++) {
if (chunk[i] === 10) {
buff = chunk.slice(i + 1, chunk.length);
break;
}
}
for (let i = 0; i > store[next].length; i++) {
if (store[next][i] === 10) {
buff += store[next].slice(0, i - 1);
break;
}
}
let setTemp = false;
let name = '';
let temp = '';
for (let i = 0; i < buff.length; i++) {
const byte = buff[i];
switch (byte) {
case 10:
const n = name;
const t = Number(temp);
if (!stations.has(name)) {
create(n, t);
} else {
update(n, t);
}
setTemp = false;
name = '';
temp = '';
break;
case 59:
setTemp = true
break;
default:
const char = String.fromCharCode(byte);
if (setTemp) {
temp += char;
} else {
name += char;
}
break;
}
}
report(id, ps);
}
stream.on("data", (chunk) => {
totalBytes += chunk.length;
const id = enqueue(chunk);
process(id);
});
stream.on("end", () => {
const result = [...stations]
.map(([name, vec]) => ([name, `${name}=${vec.z}/${Math.floor(vec.y/vec.x) / 1000}/${vec.w}`]))
.sort(([a], [b]) => a.charCodeAt(0) - b.charCodeAt(0));
console.log(result);
console.log(stations);
const total = Math.floor((performance.now() - start) / 1000);
const minutes = Math.floor(total / 60);
const seconds = total % 60;
console.log('total time:', minutes, 'm', seconds, 's');
})
Confirming this continues to be a bug in Bun v1.1.4.
❯ bun a.js
Data written to file.
❯ node a.js
Data written to file.
Got chunk Hello, World!
This initially looks like an event loop ref/unref bug, but if we add a setTimeout
we can see that the "Got chunk"
callback is still never emitted.
Debug logs:
[SYS] openat(-2, /private/tmp/bunfig.toml) = -1
[SYS] openat(-2, a.js) = 3
[SYS] fstat(3[/private/tmp/a.js]) = 0
[SYS] close(3[/private/tmp/a.js])
[SYS] close(3[/])
[SYS] close(4[/private])
[SYS] close(5[/private/tmp])
[SYS] openat(-2, /private/tmp/a.js) = 4
[fs] openat([invalid_fd], /private/tmp/a.js) = 4[/private/tmp/a.js]
[SYS] close(4[/private/tmp/a.js])
[Loop] ref
[Loop] ref
[SYS] openat(-2, /var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt) = 4
[Loop] sub 2 - 1 = 1
[alloc] new() = src.bun.js.webcore.streams.ReadableStreamSource(src.bun.js.webcore.streams.FileReader,"File",(function 'onStart'),(function 'onPull'),(function 'onCancel'),(function 'deinit'),(function 'setRefOrUnref'),(function 'drain'))@11bc04170
[alloc] new() = src.bun.js.webcore.streams.FileSink@11bd04090
[SYS] dup(4) = 7
[SYS] fstat(7[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt]) = 0
[SYS] write(7[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt], 13) = 13 (0.040ms)
[FileSink] onWrite(13, src.io.PipeWriter.WriteStatus.drained)
Data written to file.
[SYS] dup(4) = 8
[SYS] fstat(8[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt]) = 0
[SYS] read(8[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt], 262144) = 0 (0.013ms)
[alloc] new() = posix_event_loop.Closer@11be04090
[FileReader] onReaderDone()
[SYS] close(8[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt])
[alloc] destroy(posix_event_loop.Closer@11be04090)
[FileReader] onPull(65536) = 0
[Loop] ref
[SYS] close(4[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt])
[Loop] sub 2 - 1 = 1
[EventLoop] tick 6.958us
[alloc] destroy(src.bun.js.webcore.streams.ReadableStreamSource(src.bun.js.webcore.streams.FileReader,"File",(function 'onStart'),(function 'onPull'),(function 'onCancel'),(function 'deinit'),(function 'setRefOrUnref'),(function 'drain'))@11bc04170)
[alloc] new() = posix_event_loop.Closer@11c004080
[SYS] close(7[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt])
[alloc] destroy(posix_event_loop.Closer@11c004080)
❯ node ./index.js
[ 'readStream', 'resume' ]
[ 'writeStream', Symbol(kConstruct) ]
[ 'readStream', Symbol(kConstruct) ]
Data written to file.
[ 'readStream', 'data' ]
Got chunk Hello, World!
[ 'readStream', 'readable' ]
[ 'readStream', 'end' ]
[ 'readStream', 'close' ]
❯ bun-debug index.js
Data written to file.
[ "readStream", "open" ]
[ "readStream", "ready" ]
[ "writeStream", "open" ]
[ "writeStream", "ready" ]
[ "readStream", "#kConstruct" ]
[ "writeStream", "#kConstruct" ]
[ "readStream", "end" ]
[ "readStream", "close" ]
the main issue is the lack of the 'data'
event, but the rest need work too.