sqlx icon indicating copy to clipboard operation
sqlx copied to clipboard

Support for INSERTing multiple records?

Open greglearns opened this issue 5 years ago • 40 comments

How do you do the equivalent of

INSERT INTO my_table (my_num, my_str) VALUES
 ($1,$2)
,($3, $4)
,($5,$6)
... etc. depending on how many records you pass in

By the way, this project looks great!

greglearns avatar May 01 '20 14:05 greglearns

This seems a popular concern right now! I asked this question on Discord and the answer for now is this is not possible, unless you generate the query yourself (and lose a lot of the compile-time-safety features as a result).

There seems to be some activity in the area though:

  • #36 could be an option on PostgreSQL (COPY into a temporary table then INSERT ... FROM).
  • #291 explicitly calls this out as a desired feature for a query builder.

connec avatar May 01 '20 14:05 connec

@connec I missed those related topics! Thanks for pointing them out.

Yes, for me, this is the biggest need for interacting with SQL -- I can write SQL myself and make sure it is safe, and binding a few arguments isn't hard, but it's a pain to bind an arbitrary number of ($n, $n+1), ($n+2, $n+3), etc.!

greglearns avatar May 01 '20 15:05 greglearns

I ran into a similar issue. I think I have a solution (for now), but it is somewhat hideous:

sqlx::query!(
    "WITH
        a AS (SELECT row_number() over(), * FROM UNNEST( $1::UUID[] ) as group_id),
        b AS (SELECT row_number() over(), * FROM UNNEST( $2::UUID[] ) as variable_id),
        c AS (SELECT row_number() over(), * FROM UNNEST( $3::INTEGER[] ) as ordering)
    INSERT INTO groups
    SELECT a.group_id, b.variable_id, c.ordering FROM a
        JOIN b ON a.row_number = b.row_number
        JOIN c ON a.row_number = c.row_number",
    &group_id_arr,
    &self.variable_ids,
    &order_arr
).execute(&mut *conn).await?;

The "trick" is that you can pass in a Vector of items to be a Postgres ARRAY type, and UNNEST that array to turn it into a set of rows. So, above, I basically split the thing I want to insert into a vec for each column of values (fortunately only 3 for me), which each get turned into a temporary table with row numbers. Once I have these, I can join them all on the row numbers to produce a final table of values, which I then pass to the INSERT.

It's pretty hideous, and I'd love to find a more concise or better approach, but it's the best I have so far!

I need to actually test it properly next, to make sure it does what I am hoping it does..

jsdw avatar Jun 20 '20 17:06 jsdw

As an aside, I also tried doing a simpler approach of passing a single array of all of the values I wanted to update at once, but I hit a wall where SQLx did not understand the type of the array I was trying to provide, so that's what led me to splitting the array up and recombining it in SQL instead.

jsdw avatar Jun 20 '20 17:06 jsdw

Just a quick FYI that, at least from a postgres POV, this works as I'd like. Here's a brief example:

db=> create table foo (id integer, foo text, bar boolean);
CREATE TABLE
db=> WITH a AS (select row_number() over(), * from unnest(ARRAY[1,2,3]) as id), b AS (select row_number() over(), * from unnest(ARRAY['a','b','c']) as foo), c AS (select row_number() over(), * from unnest(ARRAY[true,false,true]) as bar) INSERT INTO foo SELECT a.id, b.foo, c.bar FROM a JOIN b ON a.row_number = b.row_number JOIN c ON a.row_number = c.row_number;
INSERT 0 3
db=> select * from foo;
 id | foo | bar 
----+-----+-----
  1 | a   | t
  2 | b   | f
  3 | c   | t

jsdw avatar Jun 20 '20 17:06 jsdw

Hi, is there a way to do this i need to make a 25 inserts at once

shiftrtech avatar Oct 24 '20 09:10 shiftrtech

For Postgres, the below works, and is easy. Maybe this is good enough and nothing else is needed to support INSERTing multiple records, at least for Postgres.

let lala = vec![("abc", true), ("xyz", false)];
let mut v1: Vec<String> = Vec::new();
let mut v2: Vec<bool> = Vec::new();
lala.into_iter().for_each(|todo| {
    v1.push(todo.0.into());
    v2.push(todo.1);
});
dbg!((&v1, &v2));
let todo = sqlx::query(
    r#"INSERT INTO foo (description, done)
    SELECT description, done
    FROM UNNEST($1, $2) as a(description, done)
    RETURNING id, description, done"#,
)
.bind(&v1)
.bind(&v2)
.map(|row: PgRow| Todo {
    id: row.get(0),
    description: row.get(1),
    done: row.get(2),
})
.fetch_one(&mut tx)
.await.map_err(|e| dbg!(e) )?;

greglearns avatar Oct 25 '20 04:10 greglearns

@greglearns can i use this with #[derive(sqlx::FromRow)]?

shiftrtech avatar Oct 25 '20 07:10 shiftrtech

@shiftrtech I haven't tried it, so I don't know. Let us know what you find out!

greglearns avatar Oct 25 '20 13:10 greglearns

Slightly easier (IMHO cleaner) Sql:

r#"INSERT INTO foo (description, done)
SELECT * FROM UNNEST($1, $2)
RETURNING id, description, done"#,

greglearns avatar Oct 25 '20 13:10 greglearns

Does handling this in the application have any disadvantages? We could do this by doing multiple single inserts using a for loop right?

itsfarseen avatar Apr 30 '21 10:04 itsfarseen

@itsfarseen Yes, doing it in a loop means a lot more back-and-forth between application and DB.

jplatte avatar Apr 30 '21 10:04 jplatte

Until this is fully supported, you can send an entire json object/list of records, in one insert command and then in the insert query break the json into rows (in postgres, it is something like "jsonb_to_rows"). This is less efficient than being able to do it directly (using binary communication instead of json), but much much better than doing single inserts.

On Fri, Apr 30, 2021 at 4:24 AM Jonas Platte @.***> wrote:

@itsfarseen https://github.com/itsfarseen Yes, doing it in a loop means a lot more back-and-forth between application and DB.

— You are receiving this because you were mentioned. Reply to this email directly, view it on GitHub https://github.com/launchbadge/sqlx/issues/294#issuecomment-829998845, or unsubscribe https://github.com/notifications/unsubscribe-auth/ABFXGOUGTUKBC54YKDOEE5LTLKAPJANCNFSM4MXEPZMQ .

greglearns avatar Apr 30 '21 15:04 greglearns

Is doing multiple single inserts that much inefficient? For example consider a web app route POST /todos that let you insert 10 rows. I understand that a post request will take longer to execute because now the request handler has to .await 10 times instead of just one time and other I/O operations can come in between those .awaits. But the data the request handler has to send to postgres, isn't it almost same as above?
"Almost" because now there will be 10 headers, one for each row instead of 1 header for all 10 rows when we send data to postgres. Comparing this with the overhead of doing the json conversion, wouldn't this be more efficient?

itsfarseen avatar Apr 30 '21 15:04 itsfarseen

If the're on different servers that are "far apart" (in different datacenters) then the JSON solution is most likely faster, maybe substantially faster. If they're on the same machine then you're probably right and the JSON solution is slower, but not slow enough that you would notice outside of microbenchmarks. If it really matters, benchmark it for your specific situation.

jplatte avatar Apr 30 '21 16:04 jplatte

I plan on improving the UNNEST($1, $2, $3) approach by adding an adapter that lets you bind iterators directly so you don't need extra allocations to do the Array-of-Structs to Struct-of-Arrays conversion:

let lala = vec![("abc", true), ("xyz", false)];

let todo = sqlx::query(
    r#"INSERT INTO foo (description, done)
    SELECT * FROM UNNEST($1, $2)
    RETURNING id, description, done"#,
)
.bind(&PgArray(lala.iter().map(|it| it.0)))
.bind(&PgArray(lala.iter().map(|it| it.1)))

And yes, @shiftrtech you can rewrite this with query_as to use FromRow:

#[derive(sqlx::FromRow)]
struct Todo {
    id: i32,
    description: String,
    done: bool
}

let lala = vec![("abc", true), ("xyz", false)];
let mut v1: Vec<String> = Vec::new();
let mut v2: Vec<bool> = Vec::new();
lala.into_iter().for_each(|todo| {
    v1.push(todo.0.into());
    v2.push(todo.1);
});

let todo: Todo = sqlx::query_as(
    r#"INSERT INTO foo (description, done)
    SELECT * FROM UNNEST($1, $2)
    RETURNING id, description, done"#,
)
// .bind(&PgArray(lala.iter().map(|it| it.0)))
// .bind(&PgArray(lala.iter().map(|it| it.1)))
.bind(&v1)
.bind(&v2)
.fetch_one(&mut tx)
.await.map_err(|e| dbg!(e) )?;

For convenience, we might just add .bind_array() as well to the Query and QueryAs types so it might look like this:

let todo: Todo = sqlx::query_as(
    r#"INSERT INTO foo (description, done)
    SELECT * FROM UNNEST($1, $2)
    RETURNING id, description, done"#,
)
.bind_array(lala.iter().map(|it| it.0))
.bind_array(lala.iter().map(|it| it.1))
.fetch_one(&mut tx)
.await
.map_err(|e| dbg!(e) )?;

abonander avatar Apr 30 '21 21:04 abonander

Just realized though that PgArray would need &mut on the iterator during encoding but Encode::encode() takes &self. Anyone think RefCell is a bad idea here? Or maybe it can just require the iterator to be Clone?

abonander avatar Apr 30 '21 21:04 abonander

I also need to write multiple records to the database. If I understand it correctly, there is no support for multi-inserts in sqlx yet. I have tried the following and the performance is very poor. The execution takes over 2s for 100 records. Is there a prepared statement created here and will it be reused the next time the function is called? Is the statement parsed by sqlx or is it sent directly to the database? There is an index over all 7 columns in exactly this order. Could this cause performance problems? Does anyone have experience with massive inserts into a postgres database?

pub async fn insert_index_rows(
    tx: &mut Transaction<'_, Postgres>,
    rows: &Vec<&IndexRecord>,
) -> Result<(), sqlx::Error> {
    log::debug!("insert_index_rows: {}", rows.len());
    let mut v1: Vec<String> = Vec::new();
    let mut v2: Vec<String> = Vec::new();
    let mut v3: Vec<String> = Vec::new();
    let mut v4: Vec<Vec<u8>> = Vec::new();
    let mut v5: Vec<String> = Vec::new();
    let mut v6: Vec<String> = Vec::new();
    let mut v7: Vec<String> = Vec::new();
    rows.into_iter().for_each(|row| {
        v1.push(row.path.clone());
        v2.push(row.viewname.clone());
        v3.push(row.fieldname.clone());
        v4.push(row.fieldvalue.clone());
        v5.push(row.name.clone());
        v6.push(row.viewentry.read().unwrap().id.clone());
        v7.push(row.get_hash().clone());
    });
    sqlx::query("INSERT INTO resources_index (path, viewname, fieldname, fieldvalue, name, viewentry_id, id) SELECT * FROM UNNEST ($1,$2,$3,$4,$5,$6,$7)")
    .bind(v1)
    .bind(v2)
    .bind(v3)
    .bind(v4)
    .bind(v5)
    .bind(v6)
    .bind(v7)
    .execute(&mut *tx)
    .await?;
    Ok(())
}

markose avatar Jul 22 '21 08:07 markose

@markose this could be because of creating 7 different vecs and lots of cloning. It might get optimized away in release builds. Not sure tho. Could you check in a release build?

itsfarseen avatar Jul 23 '21 15:07 itsfarseen

@itsfarseen Thanks for advice. The "problem" occurred in the release build. It is also not the Rust logic, but the statement itself that takes so much time. I know it because sqlx issues a [WARN] message. I have now rebuilt it so that the rows are inserted as multi-insert (INSERT INTO ... VALUES ...). This has improved it a bit. My main question is whether this statement, as it stands, is cached in the database as a prepared statement or is it re-parsed on each call?

markose avatar Jul 24 '21 13:07 markose

I'm talking about this warning, which appears very often.

[2021-07-24T16:00:05Z WARN  sqlx::query] INSERT INTO resources_index (path, …; rows: 0, elapsed: 1.516s
    
    INSERT INTO
      resources_index (
        path,
        viewname,
        fieldname,
        fieldvalue,
        name,
        viewentry_id,
        hash
      )
    VALUES
      ($1, $2, $3, $4, $5, $6, $7),
    ($8, $9, $10, $11, $12, $13, $14),
    ...
    ($687, $688, $689, $690, $691, $692, $693),
    ($694, $695, $696, $697, $698, $699, $700)

It inserts 100 rows with small values (overall ~200 bytes per row) There are two indeces, one over all columns and one over hash column and no primary keys.

Disk iops: 9 requests completed in 9.63 ms, 72 KiB written, 934 iops, 7.30 MiB/s generated 10 requests in 9.00 s, 80 KiB, 1 iops, 8.89 KiB/s min/avg/max/mdev = 925.4 us / 1.07 ms / 1.28 ms / 100.3 us

Test is run on Google GCE (16 Cores / 64 GB RAM) and local scratch disk.

markose avatar Jul 24 '21 16:07 markose

@markose yes the query is prepared and cached if you use the sqlx::query() interface (both the functions and the macros), but keep in mind that you're going to be generating what looks like a unique query for every different length of the vector with your rewritten approach. Postgres might recognize that the query plans are similar but I doubt that it's that smart. Also keep in mind that the protocol has an inherent limit of 65535 bind parameters per query so you need to break up your inserts into batches no larger than floor(65535 / number_of_fields_per_row).

I have some suggestions regarding your UNNEST() version though. You're basically doing a deep clone of the entire input vector, and you're also not amortizing the allocations of the sub-vectors even though you know the exact length. How does this version perform?

pub async fn insert_index_rows(
    tx: &mut Transaction<'_, Postgres>,
    rows: &Vec<&IndexRecord>,
) -> Result<(), sqlx::Error> {
    log::debug!("insert_index_rows: {}", rows.len());
    let mut v1: Vec<&str> = Vec::with_capacity(rows.len());
    let mut v2: Vec<&str> = Vec::with_capacity(rows.len());
    let mut v3: Vec<&str> = Vec::with_capacity(rows.len());
    let mut v4: Vec<&[u8]> = Vec::with_capacity(rows.len());
    let mut v5: Vec<&str> = Vec::with_capacity(rows.len());
    let mut v6: Vec<String> = Vec::with_capacity(rows.len());
    let mut v7: Vec<&str> = Vec::with_capacity(rows.len());
    rows.into_iter().for_each(|row| {
        v1.push(&row.path);
        v2.push(&row.viewname);
        v3.push(&row.fieldname);
        v4.push(&row.fieldvalue);
        v5.push(&row.name);
        v6.push(row.viewentry.read().unwrap().id.clone());
        v7.push(row.get_hash());
    });
    sqlx::query("INSERT INTO resources_index (path, viewname, fieldname, fieldvalue, name, viewentry_id, id) SELECT * FROM UNNEST ($1,$2,$3,$4,$5,$6,$7)")
    .bind(v1)
    .bind(v2)
    .bind(v3)
    .bind(v4)
    .bind(v5)
    .bind(v6)
    .bind(v7)
    .execute(&mut *tx)
    .await?;
    Ok(())
}

For v6 it looks like another struct behind a std::sync::RwLock. There's unfortunately no way to convert that to a reference to the id field while still holding the read lock (I suppose you could collect a vector of RwLockReadGuards and then collect another vector referencing those but... gross), so you still end up needing to .clone() that. It should still be a significant performance improvement though.

In an async application I would recommend instead using tokio::sync::RwLock so that you're not blocking core threads if you have a task concurrently trying to acquire a write lock. Tokio's RwLock allows you to .map() the read guard which would help here except .bind() unfortunately won't accept a vector of those anyway since they don't implement sqlx::Type or sqlx::Encode, although I suppose we would accept a PR to fix that.

You might also consider modeling your data differently, maybe just storing that id field as viewentry_id: String directly in IndexRecord, in which case it can be bound like the other fields.

abonander avatar Jul 24 '21 16:07 abonander

@abonander Thanks a lot for your help! I will test this. Also I will change implementation to tokio::sync::RwLock, maybe I can completely remove this Arc.

markose avatar Jul 26 '21 06:07 markose

I've removed the hash (MD5 over all fields), which was used as ID for the row to speed up Inserts. Unfortunately, I now have the problem with the DELETE. I can now no longer use WHERE hash IN (...). Therefore I wanted to execute the DELETE statements in parallel.

pub async fn delete_index_rows(
    tx: &mut Transaction<'_, Postgres>,
    rows: Vec<&IndexRecord>,
) -> Result<(), sqlx::Error> {
    log::debug!("delete_index_rows: {}", rows.len());
    let mut futures = Vec::with_capacity(rows.len());
    for row in rows {
        let q = sqlx::query("DELETE FROM resources_index WHERE path = $1 AND viewname = $2 AND fieldname = $3 AND fieldvalue = $4 AND name = $5 AND viewentry_id = $6")
        .bind(&row.path)
        .bind(&row.viewname)
        .bind(&row.fieldname)
        .bind(&row.fieldvalue)
        .bind(&row.name)
        .bind(row.viewentry.read().unwrap().id.clone())
        .execute(&mut *tx);
        futures.push(q);
    }
    futures::future::join_all(futures);
    Ok(())
}

Unfortunately, this does not seem to be possible in the context of a transaction.

    |
293 |         .execute(&mut *tx);
    |                  ^^^^^^^^ `*tx` was mutably borrowed here in the previous iteration of the loop

Do you have an idea for this?

markose avatar Jul 26 '21 11:07 markose

@markose you are breaking one of the fundamental rules of rust: a single mutable reference within a scope. To get around this (though I'm not sure if you'd really want to.. or of the implications in terms of a transaction) you would need to use a shared interior mutability pattern of something like a Rc<Mutex<T>>. This will require you to block a shared thread from the async executor so may not give you any benefit. Something like the below will compile:

async fn run_query(tx: Rc<Mutex<Transaction<'_, Postgres>>>) {
    let mut tx = tx.lock().unwrap();
    query("SELECT * FROM my_table").execute(&mut *tx).await.unwrap();
}

async fn looper(tx: &Rc<Mutex<Transaction<'_, Postgres>>>) {
    let mut futures = vec![];
    for _ in 0..100 {
        futures.push(run_query(Rc::clone(tx)))
    }
    futures::future::join_all(futures).await;
}

human-bean-555 avatar Jul 26 '21 23:07 human-bean-555

@pauldorehill I have also thought about an exclusive lock. Thank you. I will try this.

markose avatar Jul 27 '21 06:07 markose

You don't need a Mutex, you can do a bulk delete using the same UNNEST trick as DELETE supports it with USING ..., but it's overkill to match on all the columns of a table in a DELETE statement.

Normally, if your table has a PRIMARY KEY or UNIQUE column (it really should anyway) you can just delete based on that with
= ANY($1):

sqlx::query("DELETE FROM resources_index WHERE path = ANY($1)")
    .bind(&paths_to_delete)
    .execute(&mut *tx)
    .await?;

Or even if none of the columns are technically unique, one of them usually has a common value when you're deleting a chunk of rows.

abonander avatar Jul 27 '21 07:07 abonander

I plan on improving the UNNEST($1, $2, $3) approach by adding an adapter that lets you bind iterators directly so you don't need extra allocations to do the Array-of-Structs to Struct-of-Arrays conversion: ...

I keep running into this, could I take a stab at implementing it? I think the RefCell approach would work well. If I understand correctly, this would require exposing a type PgArray which holds the iterator, and then implementing Encode<Postgres> for that type (taking into account that it is an iterator, and not an actual array).

jprochazk avatar Feb 11 '22 01:02 jprochazk

My ideal would be a type that implements Encode when T: IntoIterator and Decode when T: FromIterator so you can use it in both directions.

The current Encode/Decode trait story is kind of a mess, though. I've been meaning to simplify it, but it partially exists due to some really unhelpful compiler errors when you structure the traits the logical way. @mehcode has more context there since he authored these traits.

abonander avatar Feb 11 '22 03:02 abonander

I think streaming rows already captures any use cases I can think of for Decode, can you elaborate a bit more?

jprochazk avatar Feb 11 '22 21:02 jprochazk