odbc-api icon indicating copy to clipboard operation
odbc-api copied to clipboard

Async query execution in spawned task

Open lanklaas opened this issue 2 years ago • 8 comments

Hello,

I am trying to use the execute polling method on a connection, but the compiler keeps saying that the connection is not send, even though I promoted it to send.

This is the error

error: future cannot be sent between threads safely
   --> src/main.rs:17:18
    |
17  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Dbc`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
    |
142 |           &self,
    |           ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 |       ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
    |  __________________________________________________________________-
147 | |         let query = SqlText::new(query);
148 | |         let lazy_statement = move || self.allocate_statement();
149 | |         execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
    | |                                                                                     ^^^^^^ await occurs here, with `&self` maybe used later
150 | |     }
    | |_____- `&self` is later dropped here

Some testing code that produces it. I used the example for promote_to_send and made it async.

use lazy_static::lazy_static;
use odbc_api::Environment;
use std::{thread, time::Duration};
lazy_static! {
    static ref ENV: Environment = unsafe { Environment::new().unwrap() };
}

#[tokio::main]
async fn main() {
    const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";
    let conn = ENV.connect_with_connection_string("MSSQL").unwrap();
    let conn = unsafe { conn.promote_to_send() };
    let handle = tokio::task::spawn(async move {
        let exec = conn.execute_polling("SELECT 1", (), || {
            tokio::time::sleep(Duration::from_secs(1))
        });
        if let Some(cursor) = exec.await.unwrap() {
            // dbg!(cursor);
        }
        // if let Some(cursor) = conn.execute("SELECT 1", ()).unwrap() {
        //     // dbg!(cursor);
        // }
    });
    handle.await;
}

const CREATE: &str = r#"SELECT 1"#;

Cargo.toml

[package]
name = "spark-odbc"
version = "0.1.0"
edition = "2018"

# See more keys and their definitions at https://doc.rust-lang.org/cargo/reference/manifest.html

[dependencies]
lazy_static = "1.4.0"
# odbc="*"
# odbc-safe="*"
# threadpool = "1.8.1"
odbc-api="*"
tokio = { version = "1.21.2", features = ["macros", "rt-multi-thread", "time"] }

Rust info:

rustc 1.64.0 (a55dd71d5 2022-09-19)
stable-x86_64-unknown-linux-gnu

Is there a way around this or should I go back to the sync API?

lanklaas avatar Oct 05 '22 13:10 lanklaas

Hi @lanklaas , thanks for giving the async API a spin. It comes with a bunch of caveats, but in this case it's just something in the error message you have seemed to miss.

help: within odbc_api::Connection<'_>, the trait std::marker::Sync

So, your closure is not Send because the Connection is not Sync. Connections could never be sync unless they are wrapped in a Mutex. Why does it need to be Sync? Well:

future is not Send as this value is used across an await

This means in a multithreaded async runtime another thread might pick up the task. There is little I can think of right now, to make this more convinient (at least within a zero cost abstraction), because ODBC has this annoying property of maintaining mutable state for error messages.

However, I feel for your code might benefit from allocating statement handles explicitly. Check out PreparedPolling. See: https://docs.rs/odbc-api/latest/odbc_api/struct.PreallocatedPolling.html#method.execute

This might even get rid of the unsafe code completly. Would need to try later to know for sure though.

Cheers, Markus

pacman82 avatar Oct 05 '22 15:10 pacman82

Thanks for the quick response. I tried to put it in a Mutex as well as a Arc<Mutex>, but I still got the issue. Should the mutex work here or will it never work across an await?

Updated code:

use lazy_static::lazy_static;
use odbc_api::Environment;
use std::{
    sync::{Arc, Mutex},
    thread,
    time::Duration,
};
lazy_static! {
    static ref ENV: Environment = unsafe { Environment::new().unwrap() };
}

#[tokio::main]
async fn main() {
    const MSSQL: &str = "Driver={ODBC Driver 17 for SQL Server};\
    Server=localhost;\
    UID=SA;\
    PWD=My@Test@Password1;\
";
    let conn = ENV.connect_with_connection_string("MSSQL").unwrap();
    let conn = Arc::new(Mutex::new(unsafe { conn.promote_to_send() }));
    let conn = Arc::clone(&conn);
    let handle = tokio::task::spawn(async move {
        let conn = Arc::clone(&conn);
        let lock = conn.lock().unwrap();
        let exec = lock.execute_polling("SELECT 1", (), || {
            tokio::time::sleep(Duration::from_secs(1))
        });
        if let Some(cursor) = exec.await.unwrap() {
            // dbg!(cursor);
        }
        // if let Some(cursor) = conn.execute("SELECT 1", ()).unwrap() {
        //     // dbg!(cursor);
        // }
    });
    handle.await;
}

The full error:

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `std::sync::MutexGuard<'_, odbc_api::force_send_sync::Send<odbc_api::Connection<'_>>>`
note: future is not `Send` as this value is used across an await
   --> src/main.rs:28:35
    |
24  |         let lock = conn.lock().unwrap();
    |             ---- has type `std::sync::MutexGuard<'_, odbc_api::force_send_sync::Send<odbc_api::Connection<'_>>>` which is not `Send`
...
28  |         if let Some(cursor) = exec.await.unwrap() {
    |                                   ^^^^^^ await occurs here, with `lock` maybe used later
...
34  |     });
    |     - `lock` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Dbc`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
    |
142 |           &self,
    |           ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 |       ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
    |  __________________________________________________________________-
147 | |         let query = SqlText::new(query);
148 | |         let lazy_statement = move || self.allocate_statement();
149 | |         execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
    | |                                                                                     ^^^^^^ await occurs here, with `&self` maybe used later
150 | |     }
    | |_____- `&self` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::Connection<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Env`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/connection.rs:149:85
    |
142 |           &self,
    |           ----- has type `&odbc_api::Connection<'_>` which is not `Send`
...
146 |       ) -> Result<Option<CursorPolling<StatementImpl<'_>>>, Error> {
    |  __________________________________________________________________-
147 | |         let query = SqlText::new(query);
148 | |         let lazy_statement = move || self.allocate_statement();
149 | |         execute_with_parameters_polling(lazy_statement, Some(&query), params, sleep).await
    | |                                                                                     ^^^^^^ await occurs here, with `&self` maybe used later
150 | |     }
    | |_____- `&self` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut odbc_api::odbc_sys::Stmt`
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut std::ffi::c_void`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
153 |           while let Some(blob_ptr) = stmt.param_data().into_result(&stmt)? {
    |                          -------- has type `*mut std::ffi::c_void` which is not `Send`
...
159 |                   wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  ______________________________________________________________________^
160 | |                     .await
    | |__________________________^ await occurs here, with `blob_ptr` maybe used later
...
163 |           }
    |           - `blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `impl std::future::Future<Output = ()>`, the trait `std::marker::Send` is not implemented for `*mut &mut dyn odbc_api::parameter::Blob`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
155 |               let blob_ptr: *mut &mut dyn Blob = transmute(blob_ptr);
    |                   -------- has type `*mut &mut dyn odbc_api::parameter::Blob` which is not `Send`
...
159 |                   wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  ______________________________________________________________________^
160 | |                     .await
    | |__________________________^ await occurs here, with `blob_ptr` maybe used later
...
163 |           }
    |           - `blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: the trait `std::marker::Send` is not implemented for `dyn odbc_api::parameter::Blob`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
156 |               let blob_ref = &mut *blob_ptr;
    |                                   --------- has type `&mut dyn odbc_api::parameter::Blob` which is not `Send`
...
159 |                   wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  ______________________________________________________________________^
160 | |                     .await
    | |__________________________^ await occurs here, with `*blob_ptr` maybe used later
...
163 |           }
    |           - `*blob_ptr` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

error: future cannot be sent between threads safely
   --> src/main.rs:22:18
    |
22  |     let handle = tokio::task::spawn(async move {
    |                  ^^^^^^^^^^^^^^^^^^ future created by async block is not `Send`
    |
    = help: within `odbc_api::handles::StatementRef<'_>`, the trait `std::marker::Sync` is not implemented for `*mut odbc_api::odbc_sys::Stmt`
note: future is not `Send` as this value is used across an await
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/odbc-api-0.49.0/src/execute.rs:159:70
    |
153 |            while let Some(blob_ptr) = stmt.param_data().into_result(&stmt)? {
    |            -                                                        ----- has type `&odbc_api::handles::StatementRef<'_>` which is not `Send`
    |   _________|
    |  |
154 |  |             // The safe interfaces currently exclusively bind pointers to `Blob` trait objects
155 |  |             let blob_ptr: *mut &mut dyn Blob = transmute(blob_ptr);
156 |  |             let blob_ref = &mut *blob_ptr;
...    |
159 |  |                 wait_for(|| stmt.put_binary_batch(batch), &mut sleep)
    |  |______________________________________________________________________^
160 | ||                     .await
    | ||__________________________^ await occurs here, with `&stmt` maybe used later
161 |  |                     .into_result(&stmt)?;
162 |  |             }
163 |  |         }
    |  |_________- `&stmt` is later dropped here
note: required by a bound in `tokio::spawn`
   --> /home/pierre/.cargo/registry/src/github.com-1ecc6299db9ec823/tokio-1.21.2/src/task/spawn.rs:127:21
    |
127 |         T: Future + Send + 'static,
    |                     ^^^^ required by this bound in `tokio::spawn`

lanklaas avatar Oct 05 '22 15:10 lanklaas

May I ask why you spawn a task with tokio::task::spawn?

pacman82 avatar Oct 05 '22 16:10 pacman82

It is the closest I could get the example to the actual code. I have a lib crate that compiles fine with the async API, but when I use it in my bin crate which spawns tasks for multiple kafka listeners I get this issue

lanklaas avatar Oct 06 '22 05:10 lanklaas

Sorry for the confusion with the Mutex. It won't help you in this situation, because it is not only the Connection itself which needs to be Sync. The futures emitted by this crate are not Sync which is what tokio::spawn requires, plain and simple in its signature. Could they be Sync? Honestly I do not know yet, need to think about this. At least I feel I won't have a quick fix for this. If you want to move forward with your crate I would advice to stick around with the synchronous API for now.

pacman82 avatar Oct 06 '22 06:10 pacman82

Ok thanks for the help! Will use sync for now. I will leave the ticket open, but you can close it if you like

lanklaas avatar Oct 06 '22 06:10 lanklaas

Thank you too. Especially for the minimal reproducing example. I'll leave it open until it's either working or I understand precisly why it can not.

pacman82 avatar Oct 06 '22 07:10 pacman82

A little update. This works today using tokio::task::spawn_local instead of tokio::spawn. No mutex around the connection required.

I am also currently evaluating the safety of making statements Send, which should make this work with just plain tokio::spawn.

Best, Markus

pacman82 avatar Jul 14 '24 09:07 pacman82