bun icon indicating copy to clipboard operation
bun copied to clipboard

`fs.createReadStream` and `fs.createWriteStream` from `fd` behaviour differs from Node

Open jackyzha0 opened this issue 3 months ago • 2 comments

What version of Bun is running?

1.1.0+5903a6141

What platform is your computer?

Darwin 21.6.0 arm64 arm

What steps can reproduce the bug?

const fs = require('fs');
const path = require('path');
const os = require('os');

const tempFile = path.join(os.tmpdir(), 'temp.txt');
fs.open(tempFile, 'w+', (err, fd) => {
  if (err) {
    console.error('Error opening file:', err);
    return;
  }

  const writeStream = fs.createWriteStream('', { fd: fd });
  const readStream = fs.createReadStream('', { fd: fd, start: 0 });

  readStream.on('data', (chunk) => {
    console.log('Got chunk', chunk.toString())
  });

  const data = 'Hello, World!';
  writeStream.write(data, (err) => {
    if (err) {
      console.error('Error writing to file:', err);
      writeStream.close(() => { });
      return;
    }

    console.log('Data written to file.');
  });
});

What is the expected behavior?

~/projects/bun-fd-repro
❯ node test.js
Data written to file.
Got chunk Hello, World!

What do you see instead?

~/projects/bun-fd-repro
❯ bun test.js
Data written to file.

Additional information

Originally ran into this bug bumping Bun while testing https://github.com/replit/ruspty

jackyzha0 avatar Apr 04 '24 00:04 jackyzha0

I suspect we are closing the file descriptor instead of cloning the file descriptor, but I'm not sure yet.

Jarred-Sumner avatar Apr 04 '24 06:04 Jarred-Sumner

So was messing around with the 1brc (read 1 billion lines from a 13,7 GB text file and process the data into the expected output) challenge in js.

Because of this i was messing a round with streams and therefor used the fs.createReadStream([path][, options]). The thing is when i set the highWaterMark option, which is supposed set the Buffer size that the callback of the 'data' event of the steam should recive every time it's triggert, it doesn't.

So my code is setup up as follows,

// !! The complete code is at the bottom of this comment !! //
const fs = require('node:fs');
const fp = 'D:\\code\\challenges\\1brc-local\\data\\measurements.txt';

const stream = fs.createReadStream(fp, { highWaterMark: 7340032 });

stream.on("data", (chunk) => {});

The 7340032 should set the size of expected buffer to ~7MB. Instead i recieve a buffer with a size of 78879 bytes ~77KB.

console output with bun: afbeelding

console output with node: afbeelding

complete code:

const fs = require('node:fs');
const fp = 'D:\\code\\challenges\\1brc-local\\data\\measurements.txt';

class Vec4 {
    constructor(x, y, z, w) {
        this._x = x;
        this._y = y;
        this._z = z;
        this._w = w;
    }

    get x() {
        return this._x;
    }

    get y() {
        return this._y;
    }

    get z() {
        return this._z;
    }

    get w() {
        return this._w;
    }

    get xy() {
        return this.x, this.y;
    }

    get xz() {
        return this.x, this.z;
    }

    get xw() {
        return this.x, this.w;
    }

    get yz() {
        return this.y, this.z;
    }

    get yw() {
        return this.y, this.w;
    }

    get zw() {
        return this.z, this.w;
    }

    get xyz() {
        return this.x, this.y, this.z;
    }

    get xyw() {
        return this.x, this.y, this.w;
    }

    get xzw() {
        return this.x, this.z, this.w;
    }

    get yzw() {
        return this.y, this.z, this.w;
    }

    get xyzw() {
        return this.x, this.y, this.z, this.w;
    }

    set x(value) {
        this._x = value;
    }

    set y(value) {
        this._y = value;
    }

    set z(value) {
        this._z = value;
    }

    set w(value) {
        this._w = value;
    }
}

const executions = new Map();
function clock(fn, ...args) {
    const fnn = fn.name;
    if (!executions.has(fnn)) {
        executions.set(fnn, { count: 0, total: 0 })
    }
    const meta = executions.get(fnn);
    const s = performance.now();
    const r = fn(...args);
    const f = performance.now();
    const μs = Math.floor((f - s) * 1000);
    meta.count++;
    meta.total += μs;
    console.log(`${fn.name}() | executed in ${μs}μs | average: ${Math.floor(meta.total / meta.count)}μs`);
    return r;
}

const stations = new Map();
function create(name, temp) {
    stations.set(name, new Vec4(1, temp * 1000, temp, temp));
}

async function update(name, temp) {
    const station = stations.get(name);
    station.x = station.x + 1;
    station.y = station.y + (temp * 1000);
    if (temp < station.z) {
        station.z = temp;
        return;
    }
    if (temp > station.w) {
        station.w = temp;
        return;
    }
}

let start = performance.now();

const stream = fs.createReadStream(fp, { highWaterMark: 7340032 });

const store = [];

/**
 * @param {Buffer} val
 */
async function enqueue(val) {
    return store.push(val) - 1;
}

let last = 0

/**
 * @returns {Buffer}
 */
function dequeue() {
    const chunk = store[last];
    return [chunk, last++];
}

let totalBytes = 0;
let requiredBytes = 14_795_138_127;

async function report(id, ps) {
    const i = await id;
    const pe = performance.now();
    const total = Math.floor((pe - start) / 1000);
    const minutes = Math.floor(total / 60);
    const seconds = total % 60;
    console.log(i, 'processed chunk took:', Math.floor(pe - ps), 'ms; total:', minutes, 'm', seconds, 's; progress:', Math.floor((totalBytes / requiredBytes) * 100), '% (', totalBytes, '/', requiredBytes, ' | left:', requiredBytes - totalBytes, ')');
}

async function process(id) {
    const ps = performance.now();
    let [chunk, next] = dequeue();
    let buff;
    for (let i = 0; i < chunk.length; i++) {
        if (chunk[i] === 10) {
            buff = chunk.slice(i + 1, chunk.length);
            break;
        }
    }
    for (let i = 0; i > store[next].length; i++) {
        if (store[next][i] === 10) {
            buff += store[next].slice(0, i - 1);
            break;
        }
    }
    let setTemp = false;
    let name = '';
    let temp = '';
    for (let i = 0; i < buff.length; i++) {
        const byte = buff[i];
        switch (byte) {
            case 10:
                const n = name;
                const t = Number(temp);
                if (!stations.has(name)) {
                    create(n, t);
                } else {
                    update(n, t);
                }
                setTemp = false;
                name = '';
                temp = '';
                break;
            case 59:
                setTemp = true
                break;
            default:
                const char = String.fromCharCode(byte);
                if (setTemp) {
                    temp += char;
                } else {
                    name += char;
                }
                break;
        }
    }
    report(id, ps);
}


stream.on("data", (chunk) => {
    totalBytes += chunk.length;
    const id = enqueue(chunk);
    process(id);
});

stream.on("end", () => {
    const result = [...stations]
        .map(([name, vec]) => ([name, `${name}=${vec.z}/${Math.floor(vec.y/vec.x) / 1000}/${vec.w}`]))
        .sort(([a], [b]) => a.charCodeAt(0) - b.charCodeAt(0));
    console.log(result);
    console.log(stations);
    const total = Math.floor((performance.now() - start) / 1000);
    const minutes = Math.floor(total / 60);
    const seconds = total % 60;
    console.log('total time:', minutes, 'm', seconds, 's');
})

5m1Ly avatar Apr 04 '24 21:04 5m1Ly

Confirming this continues to be a bug in Bun v1.1.4.

❯ bun a.js
Data written to file.

❯ node a.js
Data written to file.
Got chunk Hello, World!

This initially looks like an event loop ref/unref bug, but if we add a setTimeout we can see that the "Got chunk" callback is still never emitted.

Debug logs:

[SYS] openat(-2, /private/tmp/bunfig.toml) = -1
[SYS] openat(-2, a.js) = 3
[SYS] fstat(3[/private/tmp/a.js]) = 0
[SYS] close(3[/private/tmp/a.js])
[SYS] close(3[/])
[SYS] close(4[/private])
[SYS] close(5[/private/tmp])
[SYS] openat(-2, /private/tmp/a.js) = 4
[fs] openat([invalid_fd], /private/tmp/a.js) = 4[/private/tmp/a.js]
[SYS] close(4[/private/tmp/a.js])
[Loop] ref
[Loop] ref
[SYS] openat(-2, /var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt) = 4
[Loop] sub 2 - 1 = 1
[alloc] new() = src.bun.js.webcore.streams.ReadableStreamSource(src.bun.js.webcore.streams.FileReader,"File",(function 'onStart'),(function 'onPull'),(function 'onCancel'),(function 'deinit'),(function 'setRefOrUnref'),(function 'drain'))@11bc04170
[alloc] new() = src.bun.js.webcore.streams.FileSink@11bd04090
[SYS] dup(4) = 7
[SYS] fstat(7[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt]) = 0
[SYS] write(7[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt], 13) = 13 (0.040ms)
[FileSink] onWrite(13, src.io.PipeWriter.WriteStatus.drained)
Data written to file.
[SYS] dup(4) = 8
[SYS] fstat(8[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt]) = 0
[SYS] read(8[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt], 262144) = 0 (0.013ms)
[alloc] new() = posix_event_loop.Closer@11be04090
[FileReader] onReaderDone()
[SYS] close(8[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt])
[alloc] destroy(posix_event_loop.Closer@11be04090)
[FileReader] onPull(65536) = 0
[Loop] ref
[SYS] close(4[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt])
[Loop] sub 2 - 1 = 1
[EventLoop] tick 6.958us
[alloc] destroy(src.bun.js.webcore.streams.ReadableStreamSource(src.bun.js.webcore.streams.FileReader,"File",(function 'onStart'),(function 'onPull'),(function 'onCancel'),(function 'deinit'),(function 'setRefOrUnref'),(function 'drain'))@11bc04170)
[alloc] new() = posix_event_loop.Closer@11c004080
[SYS] close(7[/private/var/folders/wj/x9081kld0873cywddf9k8nfm0000gn/T/temp.txt])
[alloc] destroy(posix_event_loop.Closer@11c004080)

Jarred-Sumner avatar Apr 19 '24 19:04 Jarred-Sumner

❯ node ./index.js
[ 'readStream', 'resume' ]
[ 'writeStream', Symbol(kConstruct) ]
[ 'readStream', Symbol(kConstruct) ]
Data written to file.
[ 'readStream', 'data' ]
Got chunk Hello, World!
[ 'readStream', 'readable' ]
[ 'readStream', 'end' ]
[ 'readStream', 'close' ]
❯ bun-debug index.js
Data written to file.
[ "readStream", "open" ]
[ "readStream", "ready" ]
[ "writeStream", "open" ]
[ "writeStream", "ready" ]
[ "readStream", "#kConstruct" ]
[ "writeStream", "#kConstruct" ]
[ "readStream", "end" ]
[ "readStream", "close" ]

the main issue is the lack of the 'data' event, but the rest need work too.

nektro avatar Apr 23 '24 04:04 nektro