crystal-pg
crystal-pg copied to clipboard
Add pipeline support
This is a rough sketch for query pipelining support. Pipelined queries in most DBs are implemented as:
- Do all the serialization of the queries, including parameters, over the wire
- Do all the reading and deserialization of results from the wire
Benchmark
Time spent on 100k SELECT 42 queries (lower is better):
user system total real
serial 0.172153 0.238778 0.410931 ( 1.462518)
pipelined 0.084153 0.089424 0.173577 ( 0.649260)
That's 55% less wall-clock time and 58% less CPU time with localhost latency. For databases where latency might be in the 1-2ms range, the difference will probably be orders of magnitude. In fact, it might've been a more dramatic difference but I had to split the pipelines into chunks — see "known issues" below.
Usage
result_sets = pg.pipeline do |pipe|
pipe.query "SELECT 42"
pipe.query "SELECT * FROM posts LIMIT $1", limit
end
result_sets.scalar(Int32) # => 42
result_sets.read_all(Post) # => [Post(@id=1, ...), Post(@id=2, ...), ...]
Via crystal-lang/crystal-db:
db = DB.open("postgres:///")
db.using_connection do |connection|
result_sets = connection.pipeline do |pipe|
# ...
end
result_sets.read_one(...)
end
The API I had in mind with this is that query_one, query_all, and query_each become query inside the pipeline block and read_one, read_all, and read_each when you're consuming results.
Known issues/Remaining work to do
- [ ]
execsupport- pipelining is often used for data manipulation where you don't necessarily need a response
- [ ] The app hangs if the pipeline is too long —
10_000.times { pipe.query "..." } - [ ] Mechanism to ensure all result sets are consumed
- I don't know yet what kind of API makes sense for that, so right now it just kinda assumes you know what you're doing
- Track pipeline consumption on the
PG::Connection?- Raise an exception if you send more queries without consuming?
- Fully consume and close the pipeline if you send more queries without consuming the rest, raising an exception if you try to read from the pipeline after that?
- Add another method that receives a block that automatically closes out the pipeline when it completes?
-
Example:
pg.pipeline do |pipe| pipe.query "..." pipe.query "..." pipe.query "..." end .results do |rs| rs.scalar(Int32) rs.read_all(Post) end # Even though we only read the results of 2 queries, the # pipeline is fully consumed by the end of the block
-
- Both?
- [ ] Clean up and document the code
Closes #155
So sorry @jgaskins I missed this being opened completely until just now!
I think it'd be cool to get pipelining in for sure. I think your approach here of a different interface for it is probably the only sane way to do it. I could never come up with a safe way to auto-pipeline queries.
No exec I think is fine.
If we can't figure out the hang, we can maybe put a limit as to how many queries are in the pipe?
Mechanism to ensure all result sets are consumed, I don’t have a strong opinion on this.