Streaming large queries using a callback function
I just want to know if this is possible. I can implement it myself. I'll get straight to the question. This is all I really want to know. On line 677 in init.moon, is row_desc guaranteed to be non-nil at that point? In other words, does the PostgreSQL protocol guarantee that the MSG_TYPE_B.row_description gets sent before MSG_TYPE_B.data_row?
If that's the case, then I can proceed with implementing what I said in the title.
My dream is this:
local websocket = ...
pg:query('SELECT * FROM table LIMIT 100000', function(row)
websocket:send_text(require'cjson'.encode(row)..'\n')
end)
What's nice about this is, you don't have to wait for postgres to return all 100k rows before sending stuff over the websocket. You'd be able to start sending stuff over the socket once you get the first row. Which is a lot faster. And plays really nice with the coroutine paradigm.
If my hypothesis in the first paragraph is correct, then I'd implement it like this:
On line 667 (linked above) you can simply call the callback function, after running this part of format_query_result on the raw string that PostgreSQL gave you over the wire.
According to ChatGPT, the answer is yes, row_desc guaranteed to be non-nil at that point.
https://chatgpt.com/share/690377af-4438-8003-ac51-4da2701e7413
Short answer: Yes—for any query that returns rows, PostgreSQL guarantees you’ll see a RowDescription before any DataRows, so row_desc will be set by the time you handle a DataRow. That’s true for the Simple Query flow, and for the Extended Query flow if you issue Describe (which is how drivers typically work) — Describe yields RowDescription, then Execute yields the stream of DataRows, then CommandComplete and ReadyForQuery. PostgreSQL
A few caveats you likely already know, but to make this bullet-proof:
Statements that don’t return rows (e.g., INSERT without RETURNING) have no RowDescription/DataRow at all (NoData/CommandComplete instead). Also, SELECT … WHERE false still sends RowDescription but zero DataRows. PostgreSQL
COPY uses CopyOutResponse/CopyData messages, not RowDescription/DataRow. Don’t try to stream those as rows. PostgreSQL
In Extended protocol, if a client skips Describe, you won’t receive a RowDescription—but pg-style drivers normally call it, precisely to know column formats before rows arrive. PostgreSQL
Given that, your streaming idea for pgmoon is sound: when you read a DataRow (D), you can decode it immediately using the previously captured row_desc and invoke the user callback right there, without buffering the entire result. That’s exactly what the protocol intends (rows can be consumed as they arrive). PgDog
So: your hypothesis holds; go ahead and hook your callback at the DataRow handling site (after running the same per-field formatting that format_query_result uses), and you’ll be able to push each row to your websocket as it streams in.
Just implemented it. Works perfect. I just realized though, one change I need to make is, the callback should be able to cancel the query, so... something like this:
local websocket = ...
pg:query('SELECT * FROM table LIMIT 100000', function(row)
if not websocket then return 'cancel' end
websocket:send_text(require'cjson'.encode(row)..'\n')
end)
The nice thing about that ^ is it requires minimal changes to the pgmoon codebase. However, I think this interface is probably better:
local websocket = ...
local stream = pg:query_stream'SELECT * FROM table LIMIT 100000'
ngx.thread.spawn(function()
while ... do
local websocket_is_closing = ...
if websocket_is_closing then
stream:close()
websocket = nil
end
end
end)
for i,row in stream:iterator() do
websocket:send_text(require'cjson'.encode(row)..'\n')
end
That way, you don't get nested callback hell and stuff like that. But that would require a bit of a refactor.
The callback approach is fine, though. You're still able to leverage the coroutine non-blocking stuff that makes OpenResty great. Using something else would diverge my version of this project too much from upstream.
I also just realized this makes keepalive impossible. I believe if you cancel a stream, you would need to close the socket and re-open it again.