Error: Stream.pipe is not a function
I tried to follow along with the example provided but when i run this code i get an error, stream.pipe is not a function. Im new to pg and nodeJS so Im not exactly sure where/how to fix this issue. This is the code i've been trying to run. Side note: Will running pg-query-stream allow me to use the results from the query in other js functions? Thanks, any help/advice is much appreciated.
const {Pool} = require('pg')
const QueryStream = require('pg-query-stream')
const JSONStream = require('JSONStream')
const pool = new Pool({})
const q = 'SELECT * FROM public."HexLogRecord"'
pool.connect()
var stream = pool.query(new QueryStream(q))
stream.pipe(JSONStream.stringify()).pipe(process.stdout)
I'm also trying to figure this out but here's the quick solution:
Use Client instead of Pool.
const {Client} = require('pg')
You can use the pool but you need to check out the connection as the pool thinks the entire command is complete when the original query function returns. So use client = await pool.connect() and then release the connection after you finish streaming the results.
@sehrope what is the purpose of using the pool in that case if you need to manage your connections by yourself?
It ensures that all parts of your application use the same connection configuration and respect the same max pool size. If you need to change any of those things it only needs to be done in one place and all parts of your application will reflect those changes.
To ensure that the connection is not shared and is properly released back to the pool you can wrap the boiler plate for getting / releasing a connection further with something like the doWithClient(...) here: https://github.com/sehrope/node-pg-query-exec/blob/b434eec639c559d498488108fa6c6c3c720a2b4d/src/index.ts#L105-L129
I still don't understand. Why wouldn't I just use the regular connection to do that? How do I benefit of pool then?
It is a regular connection. But instead of doing:
const client = new pg.Client(/* config goes here */);
await client.connect();
try {
// do something with client
} catch (err) {
// handle error
} finally {
client.release();
}
... you use the pool.connect() function so that the specifics of what configuration to use (e.g. host, port, user, password...) is not littered throughout your application. Ditto for respecting maximum connection counts. If every part of your application is creating its own connections then they could concurrently create many connections. By having all the creation happen via a pool it gives you a control point to set max limits.
So the sole benefit is to set max limit but it does not fully manage it? Why
const client = new pg.Pool();
client.query();
works without .connect(), but with streams it blows up?
FYI, that last example of yours is creating a new pool, not a new client.
The pool.query(...) internally uses pool.connect(...) to check out a connection, execute that one query, wait for it to finish, and then returns the connection to the pool for reuse later: https://github.com/brianc/node-postgres/blob/392a7f4a66d111cc4e9fd14253f09215441eed98/packages/pg-pool/index.js#L343-L394
That's fine for most DB interactions that run a command and immediately get the entire result.
That does not work when you need to use the same client for multiple commands back to back. The QueryStream in the original issue in this thread creates a Cursor internally and the interactions with that cursor require reusing the same connection as it's scoped to a backend connection. That's why it must remained checked out of the pool for the entirety of the QueryStream and using pool.query(...) is not an option.
Is there a reason why pool.query couldn't handle the query stream case? I tinkered a bit and found my self basically marrying query stream and connection handling in a way that I think should be handled by the library under the hood, but now in the user space instead.
Now it would look something like this which easily makes the code pretty cluttered and rigid:
const client = await pool.connect();
const queryStream = client
.query(new QueryStream('SELECT * FROM sometable'))
.pipe(parser);
queryStream.on('data', handleData);
queryStream.on('end', () => {
client.release();
});
queryStream.on('error', () => {
client.release();
});
while it could as well look like this:
const queryStream = await pool
.query(new QueryStream('SELECT * FROM sometable'))
.pipe(parser);
queryStream.on('data', handleData);
try{} finally{} does not work with query streams since stream might be still running after the promise will finish.