sqlx
sqlx copied to clipboard
Transaction commit raises CockroachDB error
Hello! I've been using sqlx
to talk to CockroachDB databases and recently started an upgrade from sqlx-0.3.5
to sqlx-0.4.2
. After upgrading, many tests began to fail with
PgDatabaseError {
severity: Error, code: "0A000",
message: "unimplemented: multiple active portals not supported",
detail: None,
hint: Some("You have attempted to use a feature that is not yet
implemented.\nSee: https://go.crdb.dev/issue-v/40195/v20.1"),
position: None,
where: None,
schema: None,
table: None,
column: None,
data_type: None,
constraint: None,
file: Some("distsql_running.go"),
line: Some(775),
routine: Some("init")
}
I came up with a minimal reproducing test case that does a SELECT 1
. When this query is executed within a transaction, it fails. But if I run the query directly against the postgres connection pool, it succeeds.
#[tokio::test]
async fn test_sqlx_transaction() -> Result<(), sqlx::Error> {
let pool = PgPoolOptions::new()
.max_connections(5)
.connect("postgres://root@localhost:26257/api_server")
.await?;
let mut txn = pool.begin().await.expect("should get txn");
let row: (i64,) = sqlx::query_as("SELECT $1")
.bind::<i64>(1)
.fetch_one(&mut txn)
.await
.expect("query_as error");
txn.commit().await.expect("txn should commit");
println!("row.0: {}", row.0);
Ok(())
}
Both cases succeed for sqlx-0.3.5
.
I was told in the CockroachDB community slack that my database driver is trying to have multiple active result sets (portals) concurrently on the same connection. I'm not sure where to go from there. Thanks for any ideas! I'm happy to work on a patch, given some clues, if you have ideas about what is happening.
We discovered that fetch_all
within a transaction does not exhibit the bug. The bugged behavior only appears when using fetch_one
or fetch_optional
, and only within a transaction.
While trying to debug, I was confused by the way that stream writing and message-awaiting depend on the state of the StatementCache
: https://github.com/launchbadge/sqlx/blob/master/sqlx-core/src/postgres/connection/executor.rs#L180-L188. insert
returns the id of a newly evicted cache line (if there is one), and then the code writes Close
for that evicted cache line, and waits for completion signal from the server only if a cache line is evicted. This doesn't seem like desirable behavior though - do I have the right idea?
++++1
Any existing workaround?
@sergeyshaykhullin I've been using the following code. It lets you write .fetch_one_shim(&mut txn)
where you would have written .fetch_one(&mut txn)
, and .fetch_optional_shim(&mut txn)
where you would have written .fetch_optional(&mut txn)
.
The implementation calls .fetch_all(&mut txn)
, so it's inefficient if your SQL query returns more than one row.
use sqlx::FromRow;
#[async_trait]
/// A trait for adding shim methods to sqlx::QueryAs instances.
/// This is a shim to be used while we work on an upstream fix for sqlx.
/// See https://github.com/launchbadge/sqlx/issues/933 and
/// https://github.com/cockroachdb/cockroach/issues/40195 for more details.
pub trait FetchShim<O> {
/// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
/// This will be less efficient than `sqlx::fetch_one`, if the query does not
/// limit its result rows to 1
async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
where
O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
E: 'e + Executor<'c, Database = Postgres>;
/// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
/// This will be less efficient than `sqlx::fetch_one`, if the query does not
/// limit its result rows to 1
async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
where
O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
E: 'e + Executor<'c, Database = Postgres>;
}
#[async_trait]
impl<'q, O> FetchShim<O> for sqlx::query::QueryAs<'q, sqlx::Postgres, O, PgArguments> {
async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
where
O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
E: 'e + Executor<'c, Database = Postgres>,
{
let maybe_rows = self.fetch_all(query).await;
match maybe_rows {
Ok(rows) => match rows.into_iter().next() {
Some(x) => Ok(x),
None => Err(sqlx::error::Error::RowNotFound),
},
Err(y) => Err(y),
}
}
async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
where
O: Send + Unpin + for<'r> FromRow<'r, PgRow>,
E: 'e + Executor<'c, Database = Postgres>,
{
let maybe_rows = self.fetch_all(query).await;
match maybe_rows {
Ok(rows) => Ok(rows.into_iter().next()),
Err(y) => Err(y),
}
}
}
Thx @imalsogreg it worked! btw follow a more generic implementation that also includes Map
and works with query_as!
macro:
#[async_trait]
/// A trait for adding shim methods to sqlx::QueryAs instances.
/// This is a shim to be used while we work on an upstream fix for sqlx.
/// See https://github.com/launchbadge/sqlx/issues/933 and
/// https://github.com/cockroachdb/cockroach/issues/40195 for more details.
pub trait FetchShim<DB, O>
where
DB: Database,
{
/// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
/// This will be less efficient than `sqlx::fetch_one`, if the query does not
/// limit its result rows to 1
async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
where
O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
E: 'e + Executor<'c, Database = DB>;
/// Like `sqlx::fetch_one`, except that internally it calls `sqlx::fetch_all`.
/// This will be less efficient than `sqlx::fetch_one`, if the query does not
/// limit its result rows to 1
async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
where
O: Send + Unpin + for<'r> FromRow<'r, DB::Row>,
E: 'e + Executor<'c, Database = DB>;
}
#[async_trait]
impl<'q, DB, O, A> FetchShim<DB, O> for sqlx::query::QueryAs<'q, DB, O, A>
where
DB: sqlx::Database,
A: 'q + sqlx::IntoArguments<'q, DB>,
{
async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
where
DB: sqlx::Database,
O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
E: 'e + sqlx::Executor<'c, Database = DB>,
{
self.fetch_optional_shim(query).await?.ok_or(Error::RowNotFound)
}
async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
where
O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
E: 'e + sqlx::Executor<'c, Database = DB>,
{
Ok(self.fetch_all(query).await?.into_iter().next())
}
}
#[async_trait]
impl<'q, DB, F, O, A> FetchShim<DB, O> for sqlx::query::Map<'q, DB, F, A>
where
DB: sqlx::Database,
F: FnMut(DB::Row) -> Result<O, sqlx::Error> + Send,
O: Send + Unpin,
A: 'q + Send + sqlx::IntoArguments<'q, DB>,
{
async fn fetch_one_shim<'e, 'c, E>(self, query: E) -> Result<O, sqlx::Error>
where
O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
E: 'e + sqlx::Executor<'c, Database = DB>,
{
self.fetch_optional_shim(query).await?.ok_or(Error::RowNotFound)
}
async fn fetch_optional_shim<'e, 'c, E>(self, query: E) -> Result<Option<O>, sqlx::Error>
where
O: Send + Unpin + for<'r> sqlx::FromRow<'r, DB::Row>,
E: 'e + sqlx::Executor<'c, Database = DB>,
{
Ok(self.fetch_all(query).await?.into_iter().next())
}
}
I believe this is a bug in CockroachDB, not SQLx. We never use anything but the unnamed portal, which should be replaced every time we send a Bind
message or closed at the end of a transaction.
Trying to find the error message in the source didn't yield any code results, but did give me this commit: https://github.com/cockroachdb/cockroach/commit/0ce44436cc16b9e8a9a1f7815f85e8b8b42fe274
So it appears that this will likely be fixed in the next release of CockroachDB.