rust-rocksdb icon indicating copy to clipboard operation
rust-rocksdb copied to clipboard

How to use rocksdb in/with async/await code?

Open simon-fu opened this issue 3 years ago • 12 comments

Sorry for disturbing, I could't find any useful info about the topic, so I post this issue here.

Should I spawn a thread and call rocksdb API in that thread? The following code demonstrate one of solution. Is it the best practice?

#[cfg(test)]
mod test {
    use anyhow::Result;
    use tokio::sync::oneshot;
    
    #[tokio::test]
    async fn test_db_async() -> Result<()> { 
        use rocksdb::DB;
        
        let path = "/tmp/rocks.db";
        let db = DB::open_default(path)?;
    
        #[derive(Debug)]
        enum Command {
            Get(String, oneshot::Sender<Option<Vec<u8>>>),
            Exit,
        }
    
        let (tx, mut rx) = tokio::sync::mpsc::channel(2);
        let _thread = std::thread::spawn(move || { 
            loop {
                let r = rx.blocking_recv();
                match r {
                    Some(cmd) => {
                        match cmd {
                            Command::Get(key, tx0) => {
                                let value = db.get(key)?;
                                let _r = tx0.send(value);
                            },
                            Command::Exit => break,
                        }
                    },
                    None => break,
                }
            }
            Result::<()>::Ok(())
        });
    
        let (tx0, rx0) = oneshot::channel();
        tx.send(Command::Get("my key".to_owned(), tx0)).await?;
        let value = rx0.await?;
        match value { 
            Some(value) =>  println!("retrieved value [{}]", String::from_utf8(value).unwrap()),
            None => println!("value not found"),
        }
    
        tx.send(Command::Exit).await?;
    
        let _r = _thread.join();
        Ok(())
    }
}

simon-fu avatar Sep 21 '22 04:09 simon-fu

What's the final goal? You can use rocksdb in async code in the same way as in the sync one. But you should understand that you don't get a profit because rocksdb doesn't support async operations. Your example would work more slowly than the sync one because of the creation thread overhead.

aleksuss avatar Sep 23 '22 09:09 aleksuss

What's the final goal? You can use rocksdb in async code in the same way as in the sync one. But you should understand that you don't get a profit because rocksdb doesn't support async operations. Your example would work more slowly than the sync one because of the creation thread overhead.

Calling rocksdb's I/O method may be blocked for a while especially for read/get operation if cache missing, and the other async tasks which are scheduled in the same thread will block during the read/get operation

simon-fu avatar Sep 27 '22 03:09 simon-fu

tx.send(Command::Get("my key".to_owned(), tx0)).await?;
let value = rx0.await?;

It would work like a sync version. These awaits block execution.

aleksuss avatar Sep 28 '22 07:09 aleksuss

@aleksuss wb io_uring enabled operations? I see that rust-rocksdb recently released support for building with io_uring, but the ReadOption is missing: https://github.com/facebook/rocksdb/wiki/Asynchronous-IO

https://docs.rs/rocksdb/0.19.0/rocksdb/struct.ReadOptions.html

Is it currently being worked on? I wouldn't mind sending in a PR for io_uring if help is needed

insipx avatar Oct 02 '22 19:10 insipx

@insipx C API doesn't provide such functionality yet.

aleksuss avatar Oct 06 '22 11:10 aleksuss

That's unfortunate 😔

insipx avatar Oct 06 '22 13:10 insipx

Also, it's an experimental feature and, as I understand, the async behaviour is related to internal fetching.

aleksuss avatar Oct 06 '22 13:10 aleksuss

Yeah I was just looking at the facebook/rocksdb docs, not necessarily the C API though. looking at the async docs it seems it's specifically implemented for an iterator, which would be attractive for me personally. But i'll just have to wait till Facebook adds it to the API I guess

insipx avatar Oct 06 '22 13:10 insipx

The async_iooption was recently added to the C API!

tmyers273 avatar Jan 11 '23 11:01 tmyers273

What's really giving me trouble is trying to have a rocksdb::Transaction that lives for some seconds at a time, and trying to access it from an async worker pool (forced on me by a library I want to use). Multiple async tasks accessing a single transaction.

rocksdb::Transaction borrows the DB:

pub struct Transaction<'db, DB> {
    pub(crate) inner: *mut ffi::rocksdb_transaction_t,
    pub(crate) _marker: PhantomData<&'db DB>,
}

That seems to mean that you can't just shove it in an Arc<Mutex<Transaction>>, the compiler will demand you guarantee Transaction outlives DB. This may be showing my, let's call it, "ongoing learning regarding the intersection of lifetimes and async", but I can't figure out how to do this. I can see how scoped tasks could help, but the library I'm using calls tokio::task::spawn to start tasks as it pleases. The library expects to access shared storage from multiple tasks, and I want to do this inside a transaction.

Here's the simplest thing I can think of to demonstrate my need:

#[tokio::main]
async fn main() {
    let mut opts = rocksdb::Options::default();
    opts.create_if_missing(true);
    let db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded> =
        rocksdb::OptimisticTransactionDB::open(&opts, "dummy-db").unwrap();
    let db_tx = db.transaction();

    let task = tokio::spawn({
        async move {
            db_tx.put("foo", "bar").unwrap();
        }
    });
    task.await.unwrap();
}
error[E0597]: `db` does not live long enough
  --> src/main.rs:7:17
   |
7  |     let db_tx = db.transaction();
   |                 ^^^^^^^^^^^^^^^^
   |                 |
   |                 borrowed value does not live long enough
   |                 argument requires that `db` is borrowed for `'static`
...
15 | }
   | - `db` dropped here while still borrowed

For more information about this error, try `rustc --explain E0597`.

Mutex does not help:

use std::sync::Arc;

#[tokio::main]
async fn main() {
    let mut opts = rocksdb::Options::default();
    opts.create_if_missing(true);
    let db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded> =
        rocksdb::OptimisticTransactionDB::open(&opts, "dummy-db").unwrap();
    let db_tx_lock = Arc::new(tokio::sync::Mutex::new(db.transaction()));

    let task = tokio::spawn({
        async move {
            let db_tx = db_tx_lock.lock().await;
            db_tx.put("foo", "bar").unwrap();
        }
    });
    task.await.unwrap();
}
error[E0597]: `db` does not live long enough
  --> src/main.rs:9:55
   |
9  |     let db_tx_lock = Arc::new(tokio::sync::Mutex::new(db.transaction()));
   |                                                       ^^^^^^^^^^^^^^^^
   |                                                       |
   |                                                       borrowed value does not live long enough
   |                                                       argument requires that `db` is borrowed for `'static`
...
18 | }
   | - `db` dropped here while still borrowed

For more information about this error, try `rustc --explain E0597`.

tv42 avatar May 18 '23 17:05 tv42

What's really giving me trouble is trying to have a rocksdb::Transaction that lives for some seconds at a time, and trying to access it from an async worker pool (forced on me by a library I want to use). Multiple async tasks accessing a single transaction.

rocksdb::Transaction borrows the DB:

pub struct Transaction<'db, DB> {
    pub(crate) inner: *mut ffi::rocksdb_transaction_t,
    pub(crate) _marker: PhantomData<&'db DB>,
}

That seems to mean that you can't just shove it in an Arc<Mutex<Transaction>>, the compiler will demand you guarantee Transaction outlives DB. This may be showing my, let's call it, "ongoing learning regarding the intersection of lifetimes and async", but I can't figure out how to do this. I can see how scoped tasks could help, but the library I'm using calls tokio::task::spawn to start tasks as it pleases. The library expects to access shared storage from multiple tasks, and I want to do this inside a transaction.

Here's the simplest thing I can think of to demonstrate my need:

#[tokio::main]
async fn main() {
    let mut opts = rocksdb::Options::default();
    opts.create_if_missing(true);
    let db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded> =
        rocksdb::OptimisticTransactionDB::open(&opts, "dummy-db").unwrap();
    let db_tx = db.transaction();

    let task = tokio::spawn({
        async move {
            db_tx.put("foo", "bar").unwrap();
        }
    });
    task.await.unwrap();
}
error[E0597]: `db` does not live long enough
  --> src/main.rs:7:17
   |
7  |     let db_tx = db.transaction();
   |                 ^^^^^^^^^^^^^^^^
   |                 |
   |                 borrowed value does not live long enough
   |                 argument requires that `db` is borrowed for `'static`
...
15 | }
   | - `db` dropped here while still borrowed

For more information about this error, try `rustc --explain E0597`.

Mutex does not help:

use std::sync::Arc;

#[tokio::main]
async fn main() {
    let mut opts = rocksdb::Options::default();
    opts.create_if_missing(true);
    let db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded> =
        rocksdb::OptimisticTransactionDB::open(&opts, "dummy-db").unwrap();
    let db_tx_lock = Arc::new(tokio::sync::Mutex::new(db.transaction()));

    let task = tokio::spawn({
        async move {
            let db_tx = db_tx_lock.lock().await;
            db_tx.put("foo", "bar").unwrap();
        }
    });
    task.await.unwrap();
}
error[E0597]: `db` does not live long enough
  --> src/main.rs:9:55
   |
9  |     let db_tx_lock = Arc::new(tokio::sync::Mutex::new(db.transaction()));
   |                                                       ^^^^^^^^^^^^^^^^
   |                                                       |
   |                                                       borrowed value does not live long enough
   |                                                       argument requires that `db` is borrowed for `'static`
...
18 | }
   | - `db` dropped here while still borrowed

For more information about this error, try `rustc --explain E0597`.

I don't know whethere you are still interesting in this problem. But I think you can hold db with std::Arc and let each worker hold a copy of it.

let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
let path = "rocks.db";
let db: Arc<rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded>> =
    Arc::new(rocksdb::OptimisticTransactionDB::open(&opts, path).unwrap());
let task = tokio::spawn(async move {
    let db = db.clone();
    let db_tx = db.transaction();
    db_tx.put("foo", "bar")
});
let _ = task.await.unwrap();

J0HN50N133 avatar Mar 19 '24 12:03 J0HN50N133

You can do that for the DB, sure, but I need to hold a transaction open, which borrows the database and has lifetimes in its type signature; Arc doesn't help there.

pub struct Transaction<'db, DB> { /* private fields */ }

tv42 avatar Mar 24 '24 21:03 tv42