How to use rocksdb in/with async/await code?
Sorry for disturbing, I could't find any useful info about the topic, so I post this issue here.
Should I spawn a thread and call rocksdb API in that thread? The following code demonstrate one of solution. Is it the best practice?
#[cfg(test)]
mod test {
use anyhow::Result;
use tokio::sync::oneshot;
#[tokio::test]
async fn test_db_async() -> Result<()> {
use rocksdb::DB;
let path = "/tmp/rocks.db";
let db = DB::open_default(path)?;
#[derive(Debug)]
enum Command {
Get(String, oneshot::Sender<Option<Vec<u8>>>),
Exit,
}
let (tx, mut rx) = tokio::sync::mpsc::channel(2);
let _thread = std::thread::spawn(move || {
loop {
let r = rx.blocking_recv();
match r {
Some(cmd) => {
match cmd {
Command::Get(key, tx0) => {
let value = db.get(key)?;
let _r = tx0.send(value);
},
Command::Exit => break,
}
},
None => break,
}
}
Result::<()>::Ok(())
});
let (tx0, rx0) = oneshot::channel();
tx.send(Command::Get("my key".to_owned(), tx0)).await?;
let value = rx0.await?;
match value {
Some(value) => println!("retrieved value [{}]", String::from_utf8(value).unwrap()),
None => println!("value not found"),
}
tx.send(Command::Exit).await?;
let _r = _thread.join();
Ok(())
}
}
What's the final goal? You can use rocksdb in async code in the same way as in the sync one. But you should understand that you don't get a profit because rocksdb doesn't support async operations. Your example would work more slowly than the sync one because of the creation thread overhead.
What's the final goal? You can use
rocksdbin async code in the same way as in the sync one. But you should understand that you don't get a profit becauserocksdbdoesn't support async operations. Your example would work more slowly than the sync one because of the creation thread overhead.
Calling rocksdb's I/O method may be blocked for a while especially for read/get operation if cache missing, and the other async tasks which are scheduled in the same thread will block during the read/get operation
tx.send(Command::Get("my key".to_owned(), tx0)).await?;
let value = rx0.await?;
It would work like a sync version. These awaits block execution.
@aleksuss wb io_uring enabled operations? I see that rust-rocksdb recently released support for building with io_uring, but the ReadOption is missing: https://github.com/facebook/rocksdb/wiki/Asynchronous-IO
https://docs.rs/rocksdb/0.19.0/rocksdb/struct.ReadOptions.html
Is it currently being worked on? I wouldn't mind sending in a PR for io_uring if help is needed
@insipx C API doesn't provide such functionality yet.
That's unfortunate 😔
Also, it's an experimental feature and, as I understand, the async behaviour is related to internal fetching.
Yeah I was just looking at the facebook/rocksdb docs, not necessarily the C API though. looking at the async docs it seems it's specifically implemented for an iterator, which would be attractive for me personally. But i'll just have to wait till Facebook adds it to the API I guess
The async_iooption was recently added to the C API!
What's really giving me trouble is trying to have a rocksdb::Transaction that lives for some seconds at a time, and trying to access it from an async worker pool (forced on me by a library I want to use). Multiple async tasks accessing a single transaction.
rocksdb::Transaction borrows the DB:
pub struct Transaction<'db, DB> {
pub(crate) inner: *mut ffi::rocksdb_transaction_t,
pub(crate) _marker: PhantomData<&'db DB>,
}
That seems to mean that you can't just shove it in an Arc<Mutex<Transaction>>, the compiler will demand you guarantee Transaction outlives DB. This may be showing my, let's call it, "ongoing learning regarding the intersection of lifetimes and async", but I can't figure out how to do this. I can see how scoped tasks could help, but the library I'm using calls tokio::task::spawn to start tasks as it pleases. The library expects to access shared storage from multiple tasks, and I want to do this inside a transaction.
Here's the simplest thing I can think of to demonstrate my need:
#[tokio::main]
async fn main() {
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
let db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded> =
rocksdb::OptimisticTransactionDB::open(&opts, "dummy-db").unwrap();
let db_tx = db.transaction();
let task = tokio::spawn({
async move {
db_tx.put("foo", "bar").unwrap();
}
});
task.await.unwrap();
}
error[E0597]: `db` does not live long enough
--> src/main.rs:7:17
|
7 | let db_tx = db.transaction();
| ^^^^^^^^^^^^^^^^
| |
| borrowed value does not live long enough
| argument requires that `db` is borrowed for `'static`
...
15 | }
| - `db` dropped here while still borrowed
For more information about this error, try `rustc --explain E0597`.
Mutex does not help:
use std::sync::Arc;
#[tokio::main]
async fn main() {
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
let db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded> =
rocksdb::OptimisticTransactionDB::open(&opts, "dummy-db").unwrap();
let db_tx_lock = Arc::new(tokio::sync::Mutex::new(db.transaction()));
let task = tokio::spawn({
async move {
let db_tx = db_tx_lock.lock().await;
db_tx.put("foo", "bar").unwrap();
}
});
task.await.unwrap();
}
error[E0597]: `db` does not live long enough
--> src/main.rs:9:55
|
9 | let db_tx_lock = Arc::new(tokio::sync::Mutex::new(db.transaction()));
| ^^^^^^^^^^^^^^^^
| |
| borrowed value does not live long enough
| argument requires that `db` is borrowed for `'static`
...
18 | }
| - `db` dropped here while still borrowed
For more information about this error, try `rustc --explain E0597`.
What's really giving me trouble is trying to have a
rocksdb::Transactionthat lives for some seconds at a time, and trying to access it from an async worker pool (forced on me by a library I want to use). Multiple async tasks accessing a single transaction.
rocksdb::Transactionborrows the DB:pub struct Transaction<'db, DB> { pub(crate) inner: *mut ffi::rocksdb_transaction_t, pub(crate) _marker: PhantomData<&'db DB>, }That seems to mean that you can't just shove it in an
Arc<Mutex<Transaction>>, the compiler will demand you guaranteeTransactionoutlivesDB. This may be showing my, let's call it, "ongoing learning regarding the intersection of lifetimes and async", but I can't figure out how to do this. I can see how scoped tasks could help, but the library I'm using callstokio::task::spawnto start tasks as it pleases. The library expects to access shared storage from multiple tasks, and I want to do this inside a transaction.Here's the simplest thing I can think of to demonstrate my need:
#[tokio::main] async fn main() { let mut opts = rocksdb::Options::default(); opts.create_if_missing(true); let db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded> = rocksdb::OptimisticTransactionDB::open(&opts, "dummy-db").unwrap(); let db_tx = db.transaction(); let task = tokio::spawn({ async move { db_tx.put("foo", "bar").unwrap(); } }); task.await.unwrap(); }error[E0597]: `db` does not live long enough --> src/main.rs:7:17 | 7 | let db_tx = db.transaction(); | ^^^^^^^^^^^^^^^^ | | | borrowed value does not live long enough | argument requires that `db` is borrowed for `'static` ... 15 | } | - `db` dropped here while still borrowed For more information about this error, try `rustc --explain E0597`.Mutex does not help:
use std::sync::Arc; #[tokio::main] async fn main() { let mut opts = rocksdb::Options::default(); opts.create_if_missing(true); let db: rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded> = rocksdb::OptimisticTransactionDB::open(&opts, "dummy-db").unwrap(); let db_tx_lock = Arc::new(tokio::sync::Mutex::new(db.transaction())); let task = tokio::spawn({ async move { let db_tx = db_tx_lock.lock().await; db_tx.put("foo", "bar").unwrap(); } }); task.await.unwrap(); }error[E0597]: `db` does not live long enough --> src/main.rs:9:55 | 9 | let db_tx_lock = Arc::new(tokio::sync::Mutex::new(db.transaction())); | ^^^^^^^^^^^^^^^^ | | | borrowed value does not live long enough | argument requires that `db` is borrowed for `'static` ... 18 | } | - `db` dropped here while still borrowed For more information about this error, try `rustc --explain E0597`.
I don't know whethere you are still interesting in this problem. But I think you can hold db with std::Arc and let each worker hold a copy of it.
let mut opts = rocksdb::Options::default();
opts.create_if_missing(true);
let path = "rocks.db";
let db: Arc<rocksdb::OptimisticTransactionDB<rocksdb::MultiThreaded>> =
Arc::new(rocksdb::OptimisticTransactionDB::open(&opts, path).unwrap());
let task = tokio::spawn(async move {
let db = db.clone();
let db_tx = db.transaction();
db_tx.put("foo", "bar")
});
let _ = task.await.unwrap();
You can do that for the DB, sure, but I need to hold a transaction open, which borrows the database and has lifetimes in its type signature; Arc doesn't help there.
pub struct Transaction<'db, DB> { /* private fields */ }