QueryStream gets stuck permanently on lost connections
There's a problem in pg-query-stream where it fails to reject a iterator.next() request when the connection breaks.
Here's example, using a little modified code from pg-query-stream:
const pg = require('pg')
const pool = new pg.Pool(/* connection details */)
const QueryStream = require('pg-query-stream')
//pipe 1,000,000 rows to stdout without blowing up your memory usage
pool.connect(async (err, client, done) => {
if (err) throw err;
const query = new QueryStream('SELECT * FROM generate_series(0, $1) num', [1000000]);
const stream = client.query(query);
try {
for await(const row of stream) {
console.log(row);
}
console.log('End of Loop'); // we never get here on broken connections
} catch (err) {
// we should get here on a broken connection, but it never happens;
console.log('HANDLED:', err.message);
}
client.on('error', (err) => {
// we do get here on broken connections, but it's no use for our permanently-stuck loop above
});
});
In order to test it, run the code, and then execute the following SQL on the same database to which you connect, so it can disrupt the connection (just change the database name in the query):
SELECT pg_terminate_backend(pid) FROM pg_stat_activity WHERE datname='your-database-name-here';
Then you will see that for await loop never ends, and it never throws any error.
The reason this happens, because when connection is lost, JavaScript internally makes next-row request on the iterator:
const nextRow = await i.next(); // now this promise never resolves and never rejects either, it hangs permannetly
But i.next() now returns a promise that never resolves or rejects, so it hangs there forever. It's supposed to reject, since the streaming has errored.
Presently, I have to provide some awkward work-arounds via AbortController, to force-reject the promise that gets stuck.
I implemented a work-around to this bug within pg-iterator, see the source code here and here. But I'm really hoping for a fix within pg-query-stream.
Ran into the same problem today.
I fixed it by propagating any errors from the client to the query stream,
// Propagate any errors from the client to the query stream to ensure it gets terminated
// correctly.
client.once("error", error => {
queryStream.emit("error", error);
});
It would be great if this sort of wiring was done under the hood. Happy to raise a PR for this if the solution of propagating the error is considered viable.