transaction statements out of order
Bug Description
Not really sure what exactly the bug is but i am seeing strange behavior that makes it look like sql statements are not executed in the correct order. I have a reproduction here ( https://github.com/pythoneer/sqlx_test ) , but it is not really minimal. It uses axum and some strange wrk stuff to trigger it, because that is where i originally encountered that behavior, not sure if it could be further reduced or if it has something to do with how axum operates specifically.
I am just trying to do this in a handler
https://github.com/pythoneer/sqlx_test/blob/c220fda7c5f60fc0ef22f7ba64f3436fac7063e6/src/main2.rs#L41-L59
async fn test(State(pool): State<PgPool>) -> Result<String, (StatusCode, String)> {
sleep(Duration::from_millis(2)).await;
let mut transaction = pool.begin().await.map_err(internal_error)?;
// -----------------------8<--------- this is optional ---------------->8----------
transaction
.execute("SET TRANSACTION ISOLATION LEVEL SERIALIZABLE;")
.await
.map_err(|err| panic!("Failed to set transaction isolation level: {:#?}", err))?;
let data: String = sqlx::query_scalar("select 'hello world from pg'")
.fetch_one(&pool)
.await
.map_err(internal_error)?;
// -----------------------8<--------- this is optional ---------------->8----------
transaction.commit().await.map_err(internal_error)?;
Ok(data)
}
The SET TRANSACTION... and the select 'hello .. can also be removed it will already trigger the problem with just
let mut transaction = pool.begin().await.map_err(internal_error)?;
let transaction.commit().await.map_err(internal_error)?;
but it helps demonstrating the problem.
by running
DATABASE_URL="<your_url>" cargo run --bin binary2
and
./parallel_run.sh
to hit the endpoint we can sporadically see
2023-10-07T20:07:04.706970Z WARN sqlx::postgres::notice: there is already a transaction in progress
2023-10-07T20:07:08.272585Z WARN sqlx::postgres::notice: there is no transaction in progress
which i can't really explain. The only way i can think of is if the statements send to the database are not in order? Normally we would see BEGIN; COMMIT; pairs send parallel in each connection, right? But the warnings make me assume that sometime BEGIN; BEGIN; COMMIT; or BEGIN; COMMIT; COMMIT; happens.
With the additional SET TRANSACTION ... and select 'hello .. we can also see panics that i create with
.map_err(|err| panic!("Failed to set transaction isolation level: {:#?}", err))?;
that shows
code: "25001",
message: "SET TRANSACTION ISOLATION LEVEL must be called before any query",
and the same problem here, i can only explain this (just playing around in psql) if the select 'hello .. is executed before SET TRANSACTION ....
https://github.com/launchbadge/sqlx/assets/1102097/db9adaaf-5d5d-45c5-8890-a9766fa6d8aa
Also the values (like the sleep and all the wrk parameters and connection pool size) might depend on the specific machine idk, i tuned them to be working the best on my system. I think this can change from system to system. I think it worked the best when the endpoint delivered around 2500 req/s. The reason why i am doing the "funny" stuff in parallel_run.sh is because i noticed that this is somehow triggered specifically in the beginning or the end of a wrk run, not really sure why and what is happening in detail. Maybe wrk "just" kills connection in the middle and axum reacts strange to killed connections while a handler is running idk. But i would say regardless of what axum is doing i don't expect any of the observed things to happen. You can trigger this without parallel_run.sh but you might need to wait and potentially start and stop the wrk command manually fast consecutively. That is basically what parallel_run.sh is doing.
Minimal Reproduction
https://github.com/pythoneer/sqlx_test
needs wrk installed for parallel_run.sh.
DATABASE_URL="<your_url>" cargo run --bin binary2
and
./parallel_run.sh
Info
- SQLx version: 0.7.2
- SQLx features enabled: "postgres", "runtime-tokio-rustls", "macros", "migrate", "chrono", "json", "uuid",
- Database server and version: PostgreSQL 15.4 on x86_64-pc-linux-musl, compiled by gcc (Alpine 12.2.1_git20220924-r10) 12.2.1 20220924, 64-bit
- Operating system: arch linux (x86_64 Linux 6.5.5-arch1-1)
rustc --version: rustc 1.72.0 (5680fa18f 2023-08-23)
This query executes on a different connection since you pass the Pool as the executor:
let data: String = sqlx::query_scalar("select 'hello world from pg'")
.fetch_one(&pool)
.await
.map_err(internal_error)?;
I suspect this may be due to cancellation, as when a connection is closed mid-request Axum will cancel the handler future. I'm guessing when you kill the wrk process, it doesn't wait for in-flight requests to complete and just closes the connections right away.
The cancellation could be leaving database connections in weird states.
Sorry, silly mistake for the repro. It should be
let data: String = sqlx::query_scalar("select 'hello world from pg'")
.fetch_one(&mut *transaction)
.await
.map_err(internal_error)?;
of course. Unfortunately for the repro it does not seem to trigger it anymore in its current config. But its still happening in my "real" code where i also do use the created transaction and not the pool for any of the queries etc. the there is already a transaction in progress etc. are still there and i think all are based on the same phenomenon.
I could further reduce the repro to this https://github.com/pythoneer/sqlx_test/blob/main/src/main1.rs
DATABASE_URL="<your_url>" cargo run --bin binary1
It aborts the future just in the middle which is what we both suspected is happening with axum and wrk closing connections in the middle of the requets.
Looking at the postgres logs (i enabled statement logging) i have this
2023-10-09 09:59:32.403 UTC [532] LOG: statement: BEGIN
2023-10-09 09:59:32.403 UTC [532] LOG: statement: COMMIT
2023-10-09 09:59:32.403 UTC [532] LOG: statement: ROLLBACK
2023-10-09 09:59:32.404 UTC [532] WARNING: there is no transaction in progress
2023-10-09 09:59:32.451 UTC [533] LOG: statement: BEGIN
2023-10-09 09:59:32.451 UTC [533] LOG: statement: COMMIT
2023-10-09 09:59:32.451 UTC [533] LOG: statement: ROLLBACK
2023-10-09 09:59:32.452 UTC [533] WARNING: there is no transaction in progress
Which does not look correct and is what i suspected in the beginning. Looking at the code for the drop implementation for a transaction it seems to work differently than e.g. a simple commit. Did not go further down the line but it looks like that invocation is deferred into a queue(?) and executed later.
This seems to be an issue with cancellation safety of transactions in sqlx. I found a self-contained way of reproducing the issue.
Cargo.toml:
[package]
name = "sqlx_test"
version = "0.1.0"
edition = "2021"
[dependencies]
anyhow = "1"
rand = "0.8"
clap = { version = "4", features = ["derive"] }
tokio = { version = "1", features = ["macros", "time", "rt-multi-thread"] }
tracing = "0.1"
tracing-subscriber = "0.3.17"
sqlx = {version = "=0.7.2", default-features = false, features = ["postgres", "runtime-tokio-rustls"]}
main.rs:
use clap::Parser;
use rand::{thread_rng, Rng};
use sqlx::PgPool;
use std::sync::Arc;
use std::time::Duration;
use tokio::task::JoinSet;
use tokio::time::Instant;
use tracing::{error, info, info_span, Instrument, Level};
#[derive(Parser, Debug)]
struct CommandlineArguments {
#[arg(long, default_value_t = 100)]
tasks: usize,
#[arg(long, default_value_t = 5)]
runtime_seconds: u64,
#[arg(long, default_value_t = 100)]
max_keep_transaction_milliseconds: u64,
#[arg(long, default_value_t = 100)]
max_drop_after_milliseconds: u64,
}
#[tokio::main]
async fn main() -> anyhow::Result<()> {
let arguments = CommandlineArguments::parse();
let runtime = Duration::from_secs(arguments.runtime_seconds);
let max_keep_transaction = Duration::from_millis(arguments.max_keep_transaction_milliseconds);
let max_drop_after = Duration::from_millis(arguments.max_drop_after_milliseconds);
tracing_subscriber::fmt().with_max_level(Level::INFO).init();
info!(?arguments, "Commandline");
let pool =
Arc::new(PgPool::connect("postgres://postgres:postgres@localhost:51337/postgres").await?);
let mut join_set = JoinSet::new();
for index in 0..arguments.tasks {
let span = info_span!("task", index);
join_set.spawn(
task(pool.clone(), max_keep_transaction, max_drop_after, runtime).instrument(span),
);
}
while let Some(res) = join_set.join_next().await {
if let Err(err) = res {
error!("Task failed: {:?}", err);
}
}
Ok(())
}
async fn task(
pool: Arc<PgPool>,
max_keep_transaction: Duration,
max_drop_after: Duration,
runtime: Duration,
) -> anyhow::Result<()> {
let start = Instant::now();
while start.elapsed() < runtime {
let operation = Operation::random(pool.clone(), max_keep_transaction, max_drop_after);
let span = info_span!(
"operation",
keep_transaction_for = operation.keep_transaction_for.as_millis(),
drop_after = operation.drop_after.as_millis()
);
operation.run().instrument(span).await?;
}
Ok(())
}
struct Operation {
pool: Arc<PgPool>,
keep_transaction_for: Duration,
drop_after: Duration,
}
impl Operation {
fn random(pool: Arc<PgPool>, max_keep_transaction: Duration, max_drop_after: Duration) -> Self {
let mut rng = thread_rng();
Self {
pool,
keep_transaction_for: rng.gen_range(Duration::ZERO..=max_keep_transaction),
drop_after: rng.gen_range(Duration::ZERO..=max_drop_after),
}
}
async fn run(self) -> anyhow::Result<()> {
tokio::select! {
result = self.start_transaction_and_commit() => {
result
}
_ = tokio::time::sleep(self.drop_after) => {
// drops the open transaction from the other branch
Ok(())
}
}
}
async fn start_transaction_and_commit(&self) -> anyhow::Result<()> {
let begin_span = info_span!("begining transaction");
let transaction = self.pool.begin().instrument(begin_span).await?;
let inside_transaction_span = info_span!("inside transaction");
tokio::time::sleep(self.keep_transaction_for)
.instrument(inside_transaction_span)
.await;
let commit_span = info_span!("committing transaction");
transaction.commit().instrument(commit_span).await?;
Ok(())
}
}
docker-compose.yml:
version: "3.2"
services:
db:
image: postgres:15-alpine
container_name: sqlx_repro_db
restart: unless-stopped
tty: true
environment:
POSTGRES_USER: postgres
POSTGRES_PASSWORD: postgres
ports:
- "51337:5432"
This is what the output looks like (with cargo run --release):
2023-10-09T12:19:07.402371Z INFO sqlx_test: Commandline arguments=CommandlineArguments { tasks: 100, runtime_seconds: 5, max_keep_transaction_milliseconds: 100, max_drop_after_milliseconds: 100 }
2023-10-09T12:19:07.493629Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:07.679796Z WARN task{index=50}:operation{keep_transaction_for=61 drop_after=99}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:07.787577Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:07.869631Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:07.874933Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.130887Z WARN task{index=78}:operation{keep_transaction_for=76 drop_after=91}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:08.208183Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.370002Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.420780Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.688807Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:08.790763Z WARN task{index=90}:operation{keep_transaction_for=96 drop_after=86}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:09.160745Z WARN task{index=30}:operation{keep_transaction_for=36 drop_after=91}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:09.388145Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:09.621769Z WARN task{index=59}:operation{keep_transaction_for=10 drop_after=86}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:09.630659Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:09.754287Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:10.040783Z WARN task{index=38}:operation{keep_transaction_for=80 drop_after=70}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:10.049120Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:10.264293Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:10.283822Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:10.644704Z WARN task{index=53}:operation{keep_transaction_for=93 drop_after=79}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:10.764836Z WARN task{index=60}:operation{keep_transaction_for=29 drop_after=78}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.070738Z WARN task{index=25}:operation{keep_transaction_for=74 drop_after=98}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.074662Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:11.267963Z WARN task{index=34}:operation{keep_transaction_for=65 drop_after=86}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.270800Z WARN task{index=59}:operation{keep_transaction_for=58 drop_after=81}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.278312Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:11.431716Z WARN task{index=0}:operation{keep_transaction_for=82 drop_after=65}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:11.614500Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:11.865754Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:12.070070Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:12.084736Z WARN task{index=37}:operation{keep_transaction_for=37 drop_after=93}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
2023-10-09T12:19:12.137604Z WARN sqlx::postgres::notice: there is no transaction in progress
2023-10-09T12:19:12.298898Z WARN task{index=42}:operation{keep_transaction_for=42 drop_after=88}:begining transaction: sqlx::postgres::notice: there is already a transaction in progress
Note that in axum (and everything using hyper AFAIK), from what I understand every request is spawned as it's own task that is cancelled when the connection drops.
I'm just simulating a dropping connection by using tokio::select and dropping the future that performs a transaction.
Not very familiar with the code base but it looks like this part (and the sibling functions) are not safely increasing or decreasing the conn.transaction_depth
https://github.com/launchbadge/sqlx/blob/b1387057e5e6c6b72eacd01f491cb45854616502/sqlx-postgres/src/transaction.rs#L16-L25
What happens if the BEGIN is send but conn.transaction_depth is not increased yet when the transaction is dropped? We would not ROLLBACK right?
Seemingly related: https://github.com/launchbadge/sqlx/issues/2054
2023-11-23T12:04:50.581515Z TRACE sqlx::query: summary: "INSERT INTO credits (twitter_id, …", db.statement: "\n\nINSERT INTO\n credits (twitter_id, activity_id, credits, updated_at)\nVALUES\n ($1, $2, $3, $4) ON CONFLICT (twitter_id, activity_id) DO\nUPDATE\nSET\n credits = $3,\n updated_at = $4\n", rows_affected: 0, rows_returned: 0, elapsed: 2.27525ms at /Users/ouhuang/.cargo/registry/src/index.crates.io-6f17d22bba15001f/sqlx-core-0.7.1/src/logger.rs:117 in tower_http::trace::make_span::request with method: POST, uri: /api/twitter/verify/twitter_name, version: HTTP/1.1
I just checked if this can still be reproduced, and yes it can, both on 0.8.6 as well as latest main at the time of writing (3abb1863244685eabc274eae665fe1c6a300a36e)
Possibly fixed by #3980 which is now available in 0.9.0-alpha.1
Unfortunately not fixed in 0.9.0-alpha.1.
I see what's going on. #3980 fixed the cancel-safety for begin() but commit() and rollback() are not cancel-safe: https://github.com/launchbadge/sqlx/blob/a802da0e674e3b890d2fa3580ee843d687f2e32f/sqlx-postgres/src/transaction.rs#L40
These should queue the command synchronously, decrement the depth, and then wait for the response. That way if it's cancelled anytime after the first poll, the operation still has a chance to go through.