odbc-api
odbc-api copied to clipboard
Async query execution in spawned task
Hello,
I am trying to use the execute polling method on a connection, but the compiler keeps saying that the connection is not send, even though I promoted it to send.
This is the error
error: future cannot be sent between threads safely
--> src/main.rs:17:18
|
17 | let handle = tokio::task::spawn(async move {
| ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Dbc`
note: future is not `Send` as this value is used across an await
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
|
142 | &self,
| ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 | ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
| __________________________________________________________________-
147 | | let query = SqlText::new(query);
148 | | let lazy_statement = move || self.allocate_statement();
149 | | execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
| | ^^^^^^ await occurs here, with `&self` maybe used later
150 | | }
| |_____- `&self` is later dropped here
Some testing code that produces it. I used the example for promote_to_send
and made it async.
use lazy_static::lazy_static;
use odbc_api::Environment;
use std::{thread, time::Duration};
lazy_static! {
static ref ENV: Environment = unsafe { Environment::new().unwrap() };
}
#[tokio::main]
async fn main() {
const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};\
Server=localhost;\
UID=SA;\
PWD=My@Test@Password1;\
";
let conn = ENV.connect_with_connection_string("MSSQL").unwrap();
let conn = unsafe { conn.promote_to_send() };
let handle = tokio::task::spawn(async move {
let exec = conn.execute_polling("SELECT 1", (), || {
tokio::time::sleep(Duration::from_secs(1))
});
if let Some(cursor) = exec.await.unwrap() {
// dbg!(cursor);
}
// if let Some(cursor) = conn.execute("SELECT 1", ()).unwrap() {
// // dbg!(cursor);
// }
});
handle.await;
}
const CREATE: &str = r#"SELECT 1"#;
Cargo.toml
[package]
name = "spark-odbc"
version = "0.1.0"
edition = "2018"
# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html
[dependencies]
lazy_static = "1.4.0"
# odbc="*"
# odbc-safe="*"
# threadpool = "1.8.1"
odbc-api="*"
tokio = { version = "1.21.2", features = ["macros", "rt-multi-thread", "time"] }
Rust info:
rustc 1.64.0 (a55dd71d5 2022-09-19)
stable-x86_64-unknown-linux-gnu
Is there a way around this or should I go back to the sync API?
Hi @lanklaas , thanks for giving the async API a spin. It comes with a bunch of caveats, but in this case it's just something in the error message you have seemed to miss.
help: within
odbc_api::Connection<'_>
, the traitstd::marker::Sync
So, your closure is not Send
because the Connection
is not Sync
. Connections
could never be sync unless they are wrapped in a Mutex
. Why does it need to be Sync
? Well:
future is not
Send
as this value is used across an await
This means in a multithreaded async runtime another thread might pick up the task. There is little I can think of right now, to make this more convinient (at least within a zero cost abstraction), because ODBC has this annoying property of maintaining mutable state for error messages.
However, I feel for your code might benefit from allocating statement handles explicitly. Check out PreparedPolling
. See: https://docs.rs/odbc-api/latest/odbc_api/struct.PreallocatedPolling.html#method.execute
This might even get rid of the unsafe code completly. Would need to try later to know for sure though.
Cheers, Markus
Thanks for the quick response. I tried to put it in a Mutex as well as a Arc<Mutex>, but I still got the issue. Should the mutex work here or will it never work across an await?
Updated code:
use lazy_static::lazy_static;
use odbc_api::Environment;
use std::{
sync::{Arc, Mutex},
thread,
time::Duration,
};
lazy_static! {
static ref ENV: Environment = unsafe { Environment::new().unwrap() };
}
#[tokio::main]
async fn main() {
const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};\
Server=localhost;\
UID=SA;\
PWD=My@Test@Password1;\
";
let conn = ENV.connect_with_connection_string("MSSQL").unwrap();
let conn = Arc::new(Mutex::new(unsafe { conn.promote_to_send() }));
let conn = Arc::clone(&conn);
let handle = tokio::task::spawn(async move {
let conn = Arc::clone(&conn);
let lock = conn.lock().unwrap();
let exec = lock.execute_polling("SELECT 1", (), || {
tokio::time::sleep(Duration::from_secs(1))
});
if let Some(cursor) = exec.await.unwrap() {
// dbg!(cursor);
}
// if let Some(cursor) = conn.execute("SELECT 1", ()).unwrap() {
// // dbg!(cursor);
// }
});
handle.await;
}
The full error:
error: future cannot be sent between threads safely
--> src/main.rs:22:18
|
22 | let handle = tokio::task::spawn(async move {
| ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, odbc_api::force_send_sync::Send<odbc_api::Connection<'_>>>`
note: future is not `Send` as this value is used across an await
--> src/main.rs:28:35
|
24 | let lock = conn.lock().unwrap();
| ---- has type `std::sync::MutexGuard<'_, odbc_api::force_send_sync::Send<odbc_api::Connection<'_>>>` which is not `Send`
...
28 | if let Some(cursor) = exec.await.unwrap() {
| ^^^^^^ await occurs here, with `lock` maybe used later
...
34 | });
| - `lock` is later dropped here
note: required by a bound in `tokio::spawn`
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> src/main.rs:22:18
|
22 | let handle = tokio::task::spawn(async move {
| ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Dbc`
note: future is not `Send` as this value is used across an await
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
|
142 | &self,
| ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 | ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
| __________________________________________________________________-
147 | | let query = SqlText::new(query);
148 | | let lazy_statement = move || self.allocate_statement();
149 | | execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
| | ^^^^^^ await occurs here, with `&self` maybe used later
150 | | }
| |_____- `&self` is later dropped here
note: required by a bound in `tokio::spawn`
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> src/main.rs:22:18
|
22 | let handle = tokio::task::spawn(async move {
| ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Env`
note: future is not `Send` as this value is used across an await
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
|
142 | &self,
| ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 | ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
| __________________________________________________________________-
147 | | let query = SqlText::new(query);
148 | | let lazy_statement = move || self.allocate_statement();
149 | | execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
| | ^^^^^^ await occurs here, with `&self` maybe used later
150 | | }
| |_____- `&self` is later dropped here
note: required by a bound in `tokio::spawn`
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> src/main.rs:22:18
|
22 | let handle = tokio::task::spawn(async move {
| ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut odbc_api::odbc_sys::Stmt`
note: required by a bound in `tokio::spawn`
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> src/main.rs:22:18
|
22 | let handle = tokio::task::spawn(async move {
| ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut std::ffi::c_void`
note: future is not `Send` as this value is used across an await
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
|
153 | while let Some(blob_ptr) = stmt.param_data().into_result(&stmt)? {
| -------- has type `*mut std::ffi::c_void` which is not `Send`
...
159 | wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
| ______________________________________________________________________^
160 | | .await
| |__________________________^ await occurs here, with `blob_ptr` maybe used later
...
163 | }
| - `blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> src/main.rs:22:18
|
22 | let handle = tokio::task::spawn(async move {
| ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut &mut dyn odbc_api::parameter::Blob`
note: future is not `Send` as this value is used across an await
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
|
155 | let blob_ptr: *mut &mut dyn Blob = transmute(blob_ptr);
| -------- has type `*mut &mut dyn odbc_api::parameter::Blob` which is not `Send`
...
159 | wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
| ______________________________________________________________________^
160 | | .await
| |__________________________^ await occurs here, with `blob_ptr` maybe used later
...
163 | }
| - `blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> src/main.rs:22:18
|
22 | let handle = tokio::task::spawn(async move {
| ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: the trait `std::marker::Send` is not implemented for `dyn odbc_api::parameter::Blob`
note: future is not `Send` as this value is used across an await
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
|
156 | let blob_ref = &mut *blob_ptr;
| --------- has type `&mut dyn odbc_api::parameter::Blob` which is not `Send`
...
159 | wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
| ______________________________________________________________________^
160 | | .await
| |__________________________^ await occurs here, with `*blob_ptr` maybe used later
...
163 | }
| - `*blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
error: future cannot be sent between threads safely
--> src/main.rs:22:18
|
22 | let handle = tokio::task::spawn(async move {
| ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
|
= help: within `odbc_api::handles::StatementRef<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Stmt`
note: future is not `Send` as this value is used across an await
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
|
153 | while let Some(blob_ptr) = stmt.param_data().into_result(&stmt)? {
| - ----- has type `&odbc_api::handles::StatementRef<'_>` which is not `Send`
| _________|
| |
154 | | // The safe interfaces currently exclusively bind pointers to `Blob` trait objects
155 | | let blob_ptr: *mut &mut dyn Blob = transmute(blob_ptr);
156 | | let blob_ref = &mut *blob_ptr;
... |
159 | | wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
| |______________________________________________________________________^
160 | || .await
| ||__________________________^ await occurs here, with `&stmt` maybe used later
161 | | .into_result(&stmt)?;
162 | | }
163 | | }
| |_________- `&stmt` is later dropped here
note: required by a bound in `tokio::spawn`
--> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
|
127 | T: Future + Send + 'static,
| ^^^^ required by this bound in `tokio::spawn`
May I ask why you spawn a task with tokio::task::spawn
?
It is the closest I could get the example to the actual code. I have a lib crate that compiles fine with the async API, but when I use it in my bin crate which spawns tasks for multiple kafka listeners I get this issue
Sorry for the confusion with the Mutex
. It won't help you in this situation, because it is not only the Connection itself which needs to be Sync
. The futures emitted by this crate are not Sync
which is what tokio::spawn
requires, plain and simple in its signature. Could they be Sync
? Honestly I do not know yet, need to think about this. At least I feel I won't have a quick fix for this. If you want to move forward with your crate I would advice to stick around with the synchronous API for now.
Ok thanks for the help! Will use sync for now. I will leave the ticket open, but you can close it if you like
Thank you too. Especially for the minimal reproducing example. I'll leave it open until it's either working or I understand precisly why it can not.
A little update. This works today using tokio::task::spawn_local
instead of tokio::spawn
. No mutex around the connection required.
I am also currently evaluating the safety of making statements Send
, which should make this work with just plain tokio::spawn.
Best, Markus