[pg-query-stream] Allow 'destroy' option for QueryStream
Since I'm using QueryStream for a one-off task, I'd like to wrap it with a function that manages the connection for me.
export function streamAllRows(
config: ClientConfig,
query: string,
): stream.Readable;
However, client code needs to wait not just for the QueryStream to close, but also for the managed connection and client to be fully closed and ended.
My solution was to use the destroy argument to stream.PassThrough:
export async function streamAllRows(
config: ClientConfig,
query: string,
): Promise<stream.Readable> {
const client = new pg.Client(config);
await client.connect();
return client.query(new QueryStream(query))
.pipe(new stream.PassThrough({
objectMode: true,
destroy: (e: Error | null, callback) => {
void client.end().then(() => { callback(); }).catch(callback);
},
}));
}
This works well enough, but creates an extra object and seems unnecessarily circuitous. It would be more expressive to accept the construct and destroy arguments and forward them to super().
It can’t just forward those things to super(), because the passed destroy would override the class’s definition of _destroy. I don’t think an option that overrides when the stream ends is a good API in general; a separate stream for that makes sense if you really want that behaviour, and a separate promise might be even better, e.g. something like this:
import {finished} from 'node:stream/promises';
export async function streamAllRows(
config: ClientConfig,
query: string,
): Promise<{stream: stream.Readable, ended: Promise<void>}> {
const client = new pg.Client(config);
await client.connect();
const stream = client.query(new QueryStream(query));
return {
stream,
ended:
finished(stream)
.catch(_err => {})
.then(() => client.end()),
};
}
(Remember to handle the error event on the client.)
What's the matter with:
public _destroy(_err: Error, cb: Function) {
this.cursor.close((err?: Error) => {
this.config.destroy(err || _err, cb)
})
}