help icon indicating copy to clipboard operation
help copied to clipboard

Detect stream timeout / to slow processing

Open mStirner opened this issue 3 years ago • 0 comments

Details

I need to detect when a "middleware stream" proceed the chunks/data to slow. (In other words timedout)

This is the output to be generated when the middleware stream is fast enough (everything ok):

[input] Hello - 1661930141105
[dispatcher][input] received <Buffer 48 65 6c 6c 6f 20 2d 20 31 36 36 31 39 33 30 31 34 31 31 30 35>
[middleware] Do something with data <Buffer 48 65 6c 6c 6f 20 2d 20 31 36 36 31 39 33 30 31 34 31 31 30 35>
[middleware] Timeout after 3s, continue
[dispatcher][output] received <Buffer 48 65 6c 6c 6f 20 2d 20 31 36 36 31 39 33 30 31 34 31 31 30 35>
[output] 3009 <Buffer 48 65 6c 6c 6f 20 2d 20 31 36 36 31 39 33 30 31 34 31 31 30 35>

But this is the output i get:

[input] Hello - 1661930767218
[dispatcher][input] received <Buffer 48 65 6c 6c 6f 20 2d 20 31 36 36 31 39 33 30 37 36 37 32 31 38>
[middleware] Do something with data <Buffer 48 65 6c 6c 6f 20 2d 20 31 36 36 31 39 33 30 37 36 37 32 31 38>
Timedout!
[output] 5009 <Buffer 48 65 6c 6c 6f 20 2d 20 31 36 36 31 39 33 30 37 36 37 32 31 38>

[middleware] Timeout after 5.5s, continue
[dispatcher][output] received <Buffer 48 65 6c 6c 6f 20 2d 20 31 36 36 31 39 33 30 37 36 37 32 31 38>

I expected that the output order is exact the same as the output above, where everything was ok. Does output.push(...) from the "input stream" not push the data into the "input queue"?

Im not so familiar with streams, and they are sometimes confusing, so apologize if this question is stupid.

Is it safe to handle it like this?

Node.js version

Not applicable.

Example code

Timeout detection:

const { Transform } = require("stream");

let input = null;
let output = null;
let middleware = null;

let timeout = null;
let timedout = false;

input = new Transform({
    transform(chunk, encoding, cb) {

        console.log("[dispatcher][input] received", chunk);

        timeout = setTimeout(() => {

            console.log("Timedout!");

            timedout = true;
            output.push(chunk);

        }, 5000);

        cb(null, chunk);

    }
});

output = new Transform({
    transform(chunk, encoding, cb) {

        console.log("[dispatcher][output] received", chunk);

        if (!timedout) {
            clearInterval(timeout);
            cb(null, chunk);
        }

    }
});

middleware = new Transform({
    transform(chunk, encoding, cb) {

        console.log("[middleware] Do something with data", chunk);

        // fake some time consuption
        setTimeout(() => {

            console.log("[middleware] Timeout after 5.5s, continue");
            cb(null, chunk);

        }, 5500);

    }
});

input.pipe(middleware).pipe(output);

Data Handling


const loop = () => {

    let start = Date.now();

    output.once("data", (chunk) => {

        console.log(`[output] ${Date.now() - start}`, chunk);
        console.log();

        setTimeout(loop, 1000);

    });

    let msg = `Hello - ${Date.now()}`

    console.log(`[input] ${msg}`)
    input.write(msg);

};

loop();

Change the setTimeout delay in the middleware stream above or below 5000 to see the output above. > 5000 = Timeout < 5000 = Everything ok

(Put both snippets in a file and execute it, its ready-to-use)

Operating system

Ubuntu 18.04 LTS x64

Scope

Code

Module and version

Not applicable.

mStirner avatar Aug 31 '22 07:08 mStirner