connect icon indicating copy to clipboard operation
connect copied to clipboard

OUTPUT - Multi SQL insert from args_mapping

Open zwolsman opened this issue 2 years ago • 5 comments

Right now the SQL insert output only supports 1 item. I would like to have an option that will accept an array of items from the args_mapping that can than be expanded in the following manner.

INSERT INTO table(a, b, c) VALUES (j, k, l), (x, y, z);

This way it will be done in 1 statement instead of multiple INSERTS.

zwolsman avatar Oct 05 '22 12:10 zwolsman

It does support batching, though. See the code here. So if you send a batch of messages to this output, it should run a single INSERT statement. Please let me know if you need more details.

mihaitodor avatar Oct 06 '22 00:10 mihaitodor

Yeah batching is an option but it doesn't come with guarantees I would like to have.

We have a message coming in from pubsub with an order & order items embedded in it. We would like to have the guarantee that all order items are inserted before acking and thus having a single INSERT with multiple VALUES would help here.

With the batching it could be split between multiple batches and that is not what we want.

The split between order & order items is done by a fan-out broker which guarantees all of the outputs have to be successful before acking the message.

order -> order             -> INSERT INTO INSERT INTO table_name (x, y, z, ...) VALUES (a, b, c, ...)
           \-> order items -> INSERT INTO INSERT INTO table_name (x, y, z, ...) VALUES (a1, b1, c1, ...), (a2, b2, c3, ...), ...

zwolsman avatar Oct 14 '22 08:10 zwolsman

OK, I think I follow. I'll mark it as an enhancement request.

As a workaround, would the sql_raw output help you? You can set unsafe_dynamic_query to true which will allow you to construct the query dynamically using interpolation functions if that's needed.

mihaitodor avatar Oct 15 '22 12:10 mihaitodor

@mihaitodor Could you leave an example of such an implementation here?

intech avatar Nov 23 '22 10:11 intech

Sure! I guess you can start with something like this:

input:
  generate:
    mapping: |
      root.type = if count("type") % 3 == 1 { "order" } else { "item" }
      root.id = uuid_v4()
    count: 9
    interval: 0s
    batch_size: 3
  processors:
    - archive:
        format: json_array
    - mapping: |
        root.order = this.filter(item -> item.type == "order").index(0)
        root.items = this.filter(item -> item.type == "item")

output:
  sql_raw:
    driver: sqlite
    dsn: file:./foobar.db
    query: |
      BEGIN TRANSACTION;
      INSERT INTO orders (id) VALUES (?);
      INSERT INTO order_items (id, order_id) VALUES ${! json("items").map_each(i -> "(?, ?)").join(",") };
      COMMIT;
    unsafe_dynamic_query: true
    args_mapping: |
      root = [ this.order.id ].append(this.items.map_each(i -> [this.order.id, i.id]).flatten()).flatten()
    conn_max_idle: 1
    conn_max_open: 1
    init_statement: |
      CREATE TABLE IF NOT EXISTS orders (
        id varchar
      );
      CREATE TABLE IF NOT EXISTS order_items (
        id varchar,
        order_id varchar
      );

I'm making some possibly undesired assumptions in there:

  • Each batch contains exactly one order and multiple items which have to be inserted in one single transaction
  • An order is inserted together with its items in a single transaction
  • There's just a single column in each of the two tables called id, but you can change that by adjusting the query and args_mapping fields

mihaitodor avatar Nov 26 '22 16:11 mihaitodor

Hi I am trying to do the same thing with PostgreSQL. But I am getting the following error.

ERRO Failed to send message to sql_raw: pq: cannot insert multiple commands into a prepared statement  @service=benthos label="" path=root.output

Here is the output configuration.

  sql_raw:
    driver: postgres
    dsn: postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_SCHEMA}?sslmode=disable
    max_in_flight: 6
    query: |
      BEGIN;
      INSERT INTO first_table (id, message_body) VALUES ($1, $2);
      INSERT INTO second_table
      (id, message_body)
      VALUES
      ($1, $2)
      ON CONFLICT (id) DO UPDATE SET message_body = $2;
      COMMIT;
    unsafe_dynamic_query: true
    args_mapping: |
      root = [
        meta("kafka_key"),
        content()
      ]

I am using the version 4.11.0. What am I doing wrong here? Any help is appreciated!

Thanks in advance!

renjithgr avatar Jan 05 '23 15:01 renjithgr

@renjithgr This seems to be a Postgres-specific issue and I see some potential workarounds documented in this StackOverflow thread: https://stackoverflow.com/questions/57221749/error-cannot-insert-multiple-commands-into-a-prepared-statement. Please try those suggestions and see if any of them help.

mihaitodor avatar Jan 10 '23 22:01 mihaitodor