tiberius icon indicating copy to clipboard operation
tiberius copied to clipboard

Unable to send BoxableIo safely between threads

Open nevi-me opened this issue 5 years ago • 1 comments

I have an application that uses tiberius and tower-grpc (with tower-hyper). Before tower-grpc was updated to 0.1, I used to use wait() as a work-around on my connection like below:

/// Create connection result, waiting for the future to complete
fn connect(
) -> Result<SqlConnection<Box<dyn BoxableIo>>Error> {
    let conn_str = "server=tcp:localhost,1433;integratedSecurity=false;".to_owned();

    SqlConnection::connect(conn_str.as_str()).wait()
}

Ever since the upgrade, wait()ing blocks indefinitely, so I'm forced to remove it. However, my code now can't compile because *const tiberius::query::QueryStream<Box<dyn tiberius::BoxableIo>> cannot be sent safely between threads.

`*const tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>` cannot be sent between threads safely
within `futures::future::and_then::AndThen<futures::future::and_then::AndThen<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>, std::result::Result<std::option::Option<std::string::String>, tiberius::Error>, [closure@src\database\mssql.rs:71:19: 71:54]>`, the trait `std::marker::Send` is not implemented for `*const tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>`
required because it appears within the type `std::marker::PhantomData<*const tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>`
required because it appears within the type `tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>`
required because it appears within the type `tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>`
required because it appears within the type `futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>`
required because it appears within the type `futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>`
required because it appears within the type `futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>`
required because it appears within the type `futures::future::chain::Chain<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>`
required because it appears within the type `futures::future::and_then::AndThen<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>`
required because it appears within the type `futures::future::chain::Chain<futures::future::and_then::AndThen<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>, futures::future::result_::FutureResult<std::option::Option<std::string::String>, tiberius::Error>, [closure@src\database\mssql.rs:71:19: 71:54]>`
required because it appears within the type `futures::future::and_then::AndThen<futures::future::and_then::AndThen<std::boxed::Box<dyn futures::future::Future<Item = tiberius::SqlConnection<std::boxed::Box<dyn tiberius::BoxableIo>>, Error = tiberius::Error> + std::marker::Send>, futures::future::map::Map<futures_state_stream::Collect<futures_state_stream::Map<tiberius::stmt::QueryResult<tiberius::stmt::StmtStream<std::boxed::Box<dyn tiberius::BoxableIo>, tiberius::query::QueryStream<std::boxed::Box<dyn tiberius::BoxableIo>>>>, [closure@src\database\mssql.rs:66:18: 66:83]>>, [closure@src\database\mssql.rs:68:18: 68:59]>, [closure@src\database\mssql.rs:61:19: 69:10 value:_]>, std::result::Result<std::option::Option<std::string::String>, tiberius::Error>, [closure@src\database\mssql.rs:71:19: 71:54]>`
required for the cast to the object type `dyn futures::future::Future<Item = std::option::Option<std::string::String>, Error = tiberius::Error> + std::marker::Send`

I create my connection per the below, then any query that I try to create, results in this error.

/// Create connection result, waiting for the future to complete
fn connect(
) -> Box<dyn Future<Item = SqlConnection<Box<dyn BoxableIo>>, Error = Error> + Send> {
    let conn_str = "server=tcp:localhost,1433;integratedSecurity=false;".to_owned();

    SqlConnection::connect(conn_str.as_str())
}

// This query won't compile,
pub fn run_query(
    value: String,
) -> Box<dyn Future<Item = Option<String>, Error = Error> + Send + 'static> {
    let fut = connect()
        .and_then(|conn| {
            conn.query(
                "INSERT INTO table (value) OUTPUT INSERTED.* values (@P1)",
                &[&value.as_str()],
            )
            .map(move |row: QueryRow| (row.get::<&str, &str>("value")).to_string())
            .collect()
            .map(move |(a, _)| a.get(0).map(|r| r.clone())) // get value, lose connection
        })
        .and_then(|result: Option<String>| Ok(result));
    Box::new(fut) // this fails to compile
}

Making it return impl Future<> instead of Box<dyn Future<> + Send> only pushes the error further down to the eventual consumer.

Do you know what I'm doing wrong? BoxableIo itself supports Send, so the issues seems to be with QueryStream. Thanks

nevi-me avatar Sep 06 '19 03:09 nevi-me

This error is caused by line 73 of stmt.rs which includes PhantomData with a pointer. Replacing PhantomData<*const R> with PhantomData<R> resolves the issue.

agalakhov avatar Oct 10 '19 01:10 agalakhov