sqlx
sqlx copied to clipboard
Add support for CopyBoth mode
Does the groundwork to be able to talk replication with PostgreSQL, i.e. support "replication" connection and CopyBoth "protocol".
Fixes: #2923
To test you can do something like that.
In a local DB, create a table and setup replication for it:
ALTER TABLE <your-table> REPLICA IDENTITY FULL;
CREATE PUBLICATION test_publication FOR TABLE <your-table>;
Then run use a snippet like that:
use futures::prelude::*;
use sqlx::postgres::{PgConnectOptions, PgCopyBoth, PgPoolOptions, PgReplicationMode};
#[tokio::main]
async fn main() {
// Connection must be configured with a replication mode!
let options = PgConnectOptions::new()
.host("0.0.0.0")
.username("postgres")
.password("postgres")
.replication_mode(PgReplicationMode::Logical);
let pool = PgPoolOptions::new()
.connect_with(options)
.await
.expect("failed to connect to postgres");
let starting_pos = "0/1573178"; // CHANGE THIS
let query = format!(
r#"START_REPLICATION SLOT "{}" LOGICAL {} ("proto_version" '1', "publication_names" '{}')"#,
"test_slot", starting_pos, "test_publication",
);
let mut duplex = PgCopyBoth::new(&pool, query.as_str())
.await
.expect("start replication");
// Read data from the server.
while let Some(data) = duplex.next().await {
println!("data: {:?}", data);
}
}
Then do some insert in the table, and you should receive some bytes (new row inserted).
Anything I can do to help/make the review easier for you? 🙂
What does it mean to "steal permits from the parent"?
When one pool is a child of another, they effectively share a total max_connections.
To acquire a connection from the pool, the acquire() call must first get a permit from the pool semaphore, which represents the right to have a connection. It may then use this permit either to pop a connection from the idle queue, or open a new one if the pool size is below max_connections.
When a child pool's semaphore is exhausted, but it is below max_connections, it attempts to acquire a semaphore permit from the parent pool. When the checked-out connection using that permit is returned to the child pool, the permit is added to the child pool's semaphore, and further calls to acquire() behave normally. When a child pool is closed, any permits remaining in its semaphore are returned to the parent pool.
This does mean that the child pool can potentially exhaust the permits of the parent pool, so it's important that the sum of all child pools' max_connections are not larger than the parent's. This wasn't really a concern when designing the feature because it was only meant to be a part of #[sqlx::test]'s functionality, to allow multiple tests to run concurrently without exhausting the available connections on the server, so child pools were meant to be short-lived.
I thought about just having the child pool share the same semaphore as the parent pool, but that would have been a problem because the child pool couldn't keep connections in its own idle queue while returning permits to the parent, as other pools sharing the same parent wouldn't know about them, so the total number of open connections could easily exceed the maximum.
If exhaustion ends up becoming a problem, I think I would try addressing that by having the child pool return a permit to the parent when it closes a connection, instead of its own semaphore. And I would recommend having a short idle_timeout to make sure child pools can't hog permits for too long.
Thanks for the detailed explanation
If exhaustion ends up becoming a problem, I think I would try addressing that by having the child pool return a permit to the parent when it closes a connection, instead of its own semaphore. And I would recommend having a short
idle_timeoutto make sure child pools can't hog permits for too long.
I don't think exhaustion would be an issue in this use case, since the replication only uses a single (long-lived) connection.
Hello @abonander,
I have pushed new commits:
- In 59dd588 I added an
acquirefunction insidePgReplicationPoolbecause of 59dd588 which hid the PgPool inside a private field. - I added c6ee1e4 because of some corruptions that occurred when the handle to
PgCopyBothSenderwas dropped in the client code which resulted in anPoolConnectionbeing pushed back into the pool while still receiving messages.
Is there some way we can help to move this PR forward?
Any update on this? Reading though this it looks like sqlx would explicitly support replication. (Creating and subscribing/listening to replication slots?) Not sure if I understood the code well.
Any update on this? Reading though this it looks like sqlx would explicitly support replication. (Creating and subscribing/listening to replication slots?) Not sure if I understood the code well.
We've been using it for a while at amo and it does the job pretty well (we haven't stressed tested it yet though).
I guess the only thing preventing the merge of this is the send_async cancel-safety issue @abonander pointed out?
(pointer welcome on how to address this in the context of sqlx 🙏 )