node-pg-cursor
node-pg-cursor copied to clipboard
Getting Next Results Programmatically
Hey Brian,
I tweeted you about trying to get next results and you asked me to gist and open an issue here. I already solved my problem in code, but I wasn't able to us pg-cursor to do it.
Basically I had this:
async function test() {
console.log("THIS IS THE TEST METHOD");
const client = await pool.connect();
const renditionsCursor = client.query(new Cursor("select * from object_stacks"));
renditionsCursor.read(10, (err, rows) => {
if (err) {
throw err;
}
for (var row of rows) {
console.log(`Checking: ${row.object_name}`);
}
renditionsCursor.read(10, (err, rows) => {
for (var row of rows) {
console.log(`Checking: ${row.object_name}`);
}
});
});
}
And I was trying to figure out a way to programmatically do another renditionsCursor.read() and drain the cursor. The scenario is that I have about 1.5million rows I wanted to process and I thought pg-cursor might be able to help. It did seem like it could do the job but at the fault of my own I was unable to accomplish the goal with pg-cursor. I tried several tactics but I was never able to have one block of renditionsCursor.read() and then iterate it several times to get the next result sets.
On your NPM wiki you have a comment that says:
//Also - you probably want to use some sort of async or promise library to deal with paging //through your cursor results. node-pg-cursor makes no asumptions for you on that front.
So I was trying to use async/await to handle this but I just couldn't get it to grab the next results. Not really a big deal as I got this taken care of but I was curious to see if you've implemented what I was trying to do or if you have seen any examples of other people implementing the next results scenario.
Thanks for your time.
hmm I think you can do something like this....bare with me this is off the top of my head so it wont be perfect but hopefully conveys the gist
import { promisifyAll } from 'bluebird';
import { Client } from 'pg'
import Cursor from 'pg-cursor'
const PromiseCursor = promsifyAll(PromiseCursor)
const client = new Client()
await client.connect()
const cursor = client.query(new PromiseCursor('your query here'))
let rows;
do {
rows = await cursor.read()
// do something with rows here
} while(rows.length)
I think that should work?
I have been talking with some people in the SpeakJS Discord on the Node channel and @MrLeebo came up with this:
async function test() {
console.log("THIS IS THE TEST METHOD");
const client = await pool.connect();
const renditionsCursor = client.query(new Cursor("select * from object_stacks"));
function processData(err, rows) {
if (err) {
throw err;
}
for (let row of rows) {
console.log(row.object_id);
successCount++;
}
if (rows.length === 10) {
renditionsCursor.read(10, processData);
}
console.log(`Success count is: ${successCount}`);
}
renditionsCursor.read(10, processData);
}
Worked well, and I added some additional logging just to make sure I was getting all my rows.
I haven't given yours a try, having just seen it. I'll give it a try and let you know what happens.
EDIT: Wasn't able to get your code to return the correct result set. Looks like with a little tweaking it could work. Going to just go with what I had for now since I've spent way to much time on this. Thanks for your quick replies and a great module!
+1, same issues.
The examples docs for for pg-cursor are not very good. Showing an example of just 6 rows isn't what cursors are for. Fundamentally, if the solution requires recursion to effectively drain the cursor, this is sub-optimal. For example, I was able to pull 365,219 rows but the recursion depth got up to 367 calls. I've got a use case where need to pull millions of rows from the DB so I have no idea if it will work. Perhaps promises or pipes with flow control are a much better way? I'd submit a pull request if I had the bandwidth in my life to contribute, but I don't. Brian, I am deeply appreciative of your library and I'm not trying to crap on your efforts. Way better than I could do, kudos and many thanks! So, please take my design suggestion and criticism as kind as I intended it to be.
Thanks @hudspeth for your example. It worked great for me with 300k rows. Crossing my fingers for millions.
The solution below is working well.
import { Client } from "pg";
const Cursor = require("pg-cursor");
import { promisify } from "util";
const run = async () => {
const client = new Client();
await client.connect();
const cursor = client.query(
new Cursor("select * from my_big_table")
);
// ! IMPORTANT: do NOT overlook the `bind`
// The cursor is storing some data internally to track its state.
// It needs to be able to access that data using the `this` keyword.
// We're effectively going to pass `read` around, so we need to bind `this` to the cursor object.
const promisifiedCursorRead = promisify(cursor.read.bind(cursor));
const ROW_COUNT = 10;
while (true) {
const result = await promisifiedCursorRead(ROW_COUNT);
if (result.length === 0) {
break;
}
// handle `result` (the current batch of rows) here...
}
cursor.close(() => {
client.end();
});
};
run().catch(e => {
console.error(e);
});
If you're using babel or Node 10 (which I'm not), then you can abstract away the edge conditions using an async generator and a for/await loop:
const run = async () => {
const client = new Client();
await client.connect();
const cursor = client.query(new Cursor("select * from my_big_table"));
// ! Same thing here. Don't forget the `bind`
const promisifiedCursorRead = promisify(cursor.read.bind(cursor));
for await (const result of resultGenerator(promisifiedCursorRead)) {
// handle `result` (the current batch of rows) here. messy stuff is abstracted away by the generator
}
cursor.close(() => {
client.end();
});
};
async function* resultGenerator(promisifiedCursorRead) {
const ROW_COUNT = 10;
while (true) {
const result = await promisifiedCursorRead(ROW_COUNT);
if (result.length > 0) {
yield result;
} else {
// no more rows left to process
return;
}
}
}
I'm not using Node 10 yet, so I haven't tested the async generator approach, but it should at least be close to the right solution. Anyways, the 1st solution I posted works.
I found the same using the cursor - that it would be nice if callingclient.query(new Cursor())
returned an iterator. The way I achieved this in my code (using the pg-cursor
library), was by creating an iterator using promises, used like so:
let iterator = await createIterator ('select ... from ...')
while (!iterator.done) {
let rows = iterator.rows
// do stuff with rows
iterator = await rowIterator.next()
}
The createIterator() function
const createIterator = async sql => {
const client = await pool.connect()
const cursor = client.query(new Cursor(sql))
const batchSize = 100
return (async function getRows(client, cursor, batchSize) {
let done = false
// Get next rows
const rows = await new Promise((resolve, reject) =>
cursor.read(
batchSize,
(err, rows) => err ? reject(err) : resolve(rows)
)
)
// Check if iteration is finished
if (rows.length < 1) {
done = true
cursor.close(() => client.release())
}
// Return the iterator
return {
done,
rows,
next: async () => await getRows(client, cursor, batchSize)
}
})(client, cursor, batchSize)
}
(This is actually slightly modified, untested from the original code i had. The original, definitely working code is here: https://gist.github.com/zachsa/0c91bfd4ab435ef6e8ac4f85d541bd8b)