connect
connect copied to clipboard
OUTPUT - Multi SQL insert from args_mapping
Right now the SQL insert output only supports 1 item. I would like to have an option that will accept an array of items from the args_mapping
that can than be expanded in the following manner.
INSERT INTO table(a, b, c) VALUES (j, k, l), (x, y, z);
This way it will be done in 1 statement instead of multiple INSERTS.
It does support batching, though. See the code here. So if you send a batch of messages to this output, it should run a single INSERT
statement. Please let me know if you need more details.
Yeah batching is an option but it doesn't come with guarantees I would like to have.
We have a message coming in from pubsub with an order & order items embedded in it. We would like to have the guarantee that all order items are inserted before acking and thus having a single INSERT
with multiple VALUES
would help here.
With the batching it could be split between multiple batches and that is not what we want.
The split between order & order items is done by a fan-out broker which guarantees all of the outputs have to be successful before acking the message.
order -> order -> INSERT INTO INSERT INTO table_name (x, y, z, ...) VALUES (a, b, c, ...)
\-> order items -> INSERT INTO INSERT INTO table_name (x, y, z, ...) VALUES (a1, b1, c1, ...), (a2, b2, c3, ...), ...
OK, I think I follow. I'll mark it as an enhancement request.
As a workaround, would the sql_raw
output help you? You can set unsafe_dynamic_query
to true
which will allow you to construct the query
dynamically using interpolation functions if that's needed.
@mihaitodor Could you leave an example of such an implementation here?
Sure! I guess you can start with something like this:
input:
generate:
mapping: |
root.type = if count("type") % 3 == 1 { "order" } else { "item" }
root.id = uuid_v4()
count: 9
interval: 0s
batch_size: 3
processors:
- archive:
format: json_array
- mapping: |
root.order = this.filter(item -> item.type == "order").index(0)
root.items = this.filter(item -> item.type == "item")
output:
sql_raw:
driver: sqlite
dsn: file:./foobar.db
query: |
BEGIN TRANSACTION;
INSERT INTO orders (id) VALUES (?);
INSERT INTO order_items (id, order_id) VALUES ${! json("items").map_each(i -> "(?, ?)").join(",") };
COMMIT;
unsafe_dynamic_query: true
args_mapping: |
root = [ this.order.id ].append(this.items.map_each(i -> [this.order.id, i.id]).flatten()).flatten()
conn_max_idle: 1
conn_max_open: 1
init_statement: |
CREATE TABLE IF NOT EXISTS orders (
id varchar
);
CREATE TABLE IF NOT EXISTS order_items (
id varchar,
order_id varchar
);
I'm making some possibly undesired assumptions in there:
- Each batch contains exactly one order and multiple items which have to be inserted in one single transaction
- An order is inserted together with its items in a single transaction
- There's just a single column in each of the two tables called
id
, but you can change that by adjusting thequery
andargs_mapping
fields
Hi I am trying to do the same thing with PostgreSQL. But I am getting the following error.
ERRO Failed to send message to sql_raw: pq: cannot insert multiple commands into a prepared statement @service=benthos label="" path=root.output
Here is the output configuration.
sql_raw:
driver: postgres
dsn: postgres://${DB_USER}:${DB_PASSWORD}@${DB_HOST}:5432/${DB_SCHEMA}?sslmode=disable
max_in_flight: 6
query: |
BEGIN;
INSERT INTO first_table (id, message_body) VALUES ($1, $2);
INSERT INTO second_table
(id, message_body)
VALUES
($1, $2)
ON CONFLICT (id) DO UPDATE SET message_body = $2;
COMMIT;
unsafe_dynamic_query: true
args_mapping: |
root = [
meta("kafka_key"),
content()
]
I am using the version 4.11.0
. What am I doing wrong here? Any help is appreciated!
Thanks in advance!
@renjithgr This seems to be a Postgres-specific issue and I see some potential workarounds documented in this StackOverflow thread: https://stackoverflow.com/questions/57221749/error-cannot-insert-multiple-commands-into-a-prepared-statement. Please try those suggestions and see if any of them help.