libsql icon indicating copy to clipboard operation
libsql copied to clipboard

Insert statements within remote replica transactions fail with transaction timed-out

Open sveltespot opened this issue 1 year ago • 0 comments

Insert statements that are executed within concurrent transactions initiated from a remote replica connection seem to hang indefinitely until the transaction times out and the statements fail. Below is a minimal reproducer for the same:

use libsql::{Builder, Connection, Result};

#[tokio::main]
async fn main() {
    println!("Hello, world!");
    let db_url = "http://localhost:8080";
    let replica = Builder::new_remote_replica(
        "/tmp/embedded_transaction.db",
        db_url.to_string(),
        String::new(),
    )
    .build()
    .await
    .unwrap();
    let replica_conn_1 = replica.connect().unwrap();
    let replica_conn_2 = replica.connect().unwrap();
    let replica_conn_3 = replica.connect().unwrap();

    setup_db(replica_conn_1.clone()).await.unwrap();

    // This works as expected.
    let replica_task_1 = db_work(replica_conn_1).await;
    assert!(replica_task_1.is_ok());

    // If we execute two tasks concurrently
    let replica_task_2 = tokio::task::spawn(async move { db_work(replica_conn_2).await });
    let replica_task_3 = tokio::task::spawn(async move { db_work(replica_conn_3).await });

    let (task_2_res, task_3_res) = tokio::join!(replica_task_2, replica_task_3);
    let replica_task_2_res = task_2_res.unwrap();
    let replica_task_3_res = task_3_res.unwrap();

    if replica_task_2_res.is_err() {
        eprintln!("Task 2 failed: {:?}", replica_task_2_res);
    }
    if replica_task_3_res.is_err() {
        eprintln!("Task 3 failed: {:?}", replica_task_3_res);
    }

    // One of these concurrent tasks fail currently. Both tasks should succeed.
    assert!(replica_task_2_res.is_ok());
    assert!(replica_task_3_res.is_ok());
}

async fn db_work(conn: Connection) -> Result<()> {
    let tx = conn.transaction().await?;
    // Some business logic here...
    tx.execute("INSERT INTO test (name) VALUES (?1)", ["somename"])
        .await?;
    tokio::time::sleep(std::time::Duration::from_secs(2)).await;
    tx.commit().await?;
    Ok(())
}

async fn setup_db(conn: Connection) -> Result<()> {
    conn.execute(
        "CREATE TABLE IF NOT EXISTS test (id INTEGER PRIMARY KEY, name TEXT)",
        (),
    )
    .await?;
    Ok(())
}

sveltespot avatar Apr 06 '24 17:04 sveltespot