crystal-pg icon indicating copy to clipboard operation
crystal-pg copied to clipboard

Add pipeline support

Open jgaskins opened this issue 3 years ago • 1 comments

This is a rough sketch for query pipelining support. Pipelined queries in most DBs are implemented as:

  1. Do all the serialization of the queries, including parameters, over the wire
  2. Do all the reading and deserialization of results from the wire

Benchmark

Time spent on 100k SELECT 42 queries (lower is better):

                user     system      total        real
serial      0.172153   0.238778   0.410931 (  1.462518)
pipelined   0.084153   0.089424   0.173577 (  0.649260)

That's 55% less wall-clock time and 58% less CPU time with localhost latency. For databases where latency might be in the 1-2ms range, the difference will probably be orders of magnitude. In fact, it might've been a more dramatic difference but I had to split the pipelines into chunks — see "known issues" below.

Usage

result_sets = pg.pipeline do |pipe|
  pipe.query "SELECT 42"
  pipe.query "SELECT * FROM posts LIMIT $1", limit
end

result_sets.scalar(Int32) # => 42
result_sets.read_all(Post) # => [Post(@id=1, ...), Post(@id=2, ...), ...]

Via crystal-lang/crystal-db:

db = DB.open("postgres:///")
db.using_connection do |connection|
  result_sets = connection.pipeline do |pipe|
    # ...
  end

  result_sets.read_one(...)
end

The API I had in mind with this is that query_one, query_all, and query_each become query inside the pipeline block and read_one, read_all, and read_each when you're consuming results.

Known issues/Remaining work to do

  • [ ] exec support
    • pipelining is often used for data manipulation where you don't necessarily need a response
  • [ ] The app hangs if the pipeline is too long — 10_000.times { pipe.query "..." }
  • [ ] Mechanism to ensure all result sets are consumed
    • I don't know yet what kind of API makes sense for that, so right now it just kinda assumes you know what you're doing
    • Track pipeline consumption on the PG::Connection?
      • Raise an exception if you send more queries without consuming?
      • Fully consume and close the pipeline if you send more queries without consuming the rest, raising an exception if you try to read from the pipeline after that?
    • Add another method that receives a block that automatically closes out the pipeline when it completes?
      • Example:

        pg.pipeline do |pipe|
          pipe.query "..."
          pipe.query "..."
          pipe.query "..."
        end
        .results do |rs|
          rs.scalar(Int32)
          rs.read_all(Post)
        end
        # Even though we only read the results of 2 queries, the
        # pipeline is fully consumed by the end of the block
        
    • Both?
  • [ ] Clean up and document the code

Closes #155

jgaskins avatar Jul 02 '22 02:07 jgaskins

So sorry @jgaskins I missed this being opened completely until just now!

I think it'd be cool to get pipelining in for sure. I think your approach here of a different interface for it is probably the only sane way to do it. I could never come up with a safe way to auto-pipeline queries.

No exec I think is fine.

If we can't figure out the hang, we can maybe put a limit as to how many queries are in the pipe?

Mechanism to ensure all result sets are consumed, I don’t have a strong opinion on this.

will avatar Oct 06 '22 22:10 will