timescaledb icon indicating copy to clipboard operation
timescaledb copied to clipboard

[Bug]: Inserting a tuple(duplicate) with 'ON CONFLICT DO NOTHING' on dist-hypertable returns : "INSERT 0 1" while on a regular-hypertable returns " INSERT 0 0"

Open hardikm10 opened this issue 1 year ago • 3 comments

What type of bug is this?

Incorrect result

What subsystems and features are affected?

Data ingestion

What happened?

Inserting a tuple(duplicate) with 'ON CONFLICT DO NOTHING' on dist-hypertable returns : "INSERT 0 1" while on a regular-hypertable returns "INSERT 0 0".

Sorry for the confusing title but please find the reproduction example attached below.

TimescaleDB version affected

2.7.2

PostgreSQL version used

14.4

What operating system did you use?

Docker

What installation method did you use?

Docker

What platform did you run on?

Timescale Cloud

Relevant log output and stack trace

insert into test_mn values ('2022-04-01 00:01:00', 1, 1) ON CONFLICT DO NOTHING;
--returns:  INSERT 0 0

BUT 
 
insert into dist_test_mn values ('2022-04-01 00:01:00', 1, 1) ON CONFLICT DO NOTHING;
--returns:  INSERT 0 1

How can we reproduce the bug?

drop table test_mn;
create table test_mn (time timestamp, id integer, value integer, primary key(time,value) );
select create_hypertable('test_mn', 'time');
insert into test_mn values ('2022-04-01 00:00:00', 1, 0);
insert into test_mn values ('2022-04-01 00:01:00', 1, 1);

insert into test_mn values ('2022-04-01 00:01:00', 1, 1) ON CONFLICT DO NOTHING;
--returns:  INSERT 0 0 

drop table dist_test_mn;
create table dist_test_mn (time timestamp, id integer, value integer, primary key(time,value) );
select create_distributed_hypertable('dist_test_mn', 'time', 'value');
insert into dist_test_mn values ('2022-04-01 00:00:00', 1, 0);
insert into dist_test_mn values ('2022-04-01 00:01:00', 1, 1);

insert into dist_test_mn values ('2022-04-01 00:01:00', 1, 1) ON CONFLICT DO NOTHING;
--returns:  INSERT 0 1

hardikm10 avatar Aug 01 '22 06:08 hardikm10

The current logic sets the number of processed tuples for the INSERT based on the input that we are sending to the DNs. In case of ON CONFLICT, the tuple may or may not get inserted into the DNs. We need to check for the response from the DNs and adjust the processed tuple count accordingly.

nikkhils avatar Sep 06 '22 08:09 nikkhils

We also need to fix RETURNING *. At the moment, empty tuples are returned. I've read the code and this will require a small redesign.

65278 avatar Sep 06 '22 10:09 65278

Sorry for the incoming wall of text.

This issue is more tricky than it seems.

The problem is that in await_all_replies, depending on replication factor, the amount of data nodes and batch size, one insert can contain both tuples for the primary tuple store and replica tuple store at the same time.

Therefore, using the amount of tuples inserted from pgresult gives you a wrong number under non-trivial conditions. I've crafted an easy demonstration with what is essentially printf debugging.

1.) Patch

index 06451a4f..26303ab7 100644
--- a/tsl/src/nodes/data_node_dispatch.c
+++ b/tsl/src/nodes/data_node_dispatch.c
@@ -220,6 +220,7 @@ typedef struct DataNodeState
 										* when RETURNING is specified. */
 	PreparedStmt *pstmt;			   /* Prepared statement to use in the FLUSH state */
 	int num_tuples_sent;			   /* Number of tuples sent in the FLUSH or LAST_FLUSH states */
+	int num_tuples_primary; 		   /* Number of tuples in the primary tupstore */
 	int num_tuples_inserted;		   /* Number of tuples inserted (returned in result)
 										* during the FLUSH or LAST_FLUSH states */
 	int next_tuple;					   /* The next tuple to return in the RETURNING state */
@@ -499,6 +500,10 @@ send_batch_to_data_node(DataNodeDispatchState *sds, DataNodeState *ss)
 		stmt_params_convert_values(sds->stmt_params, slot, NULL);
 		ss->num_tuples_sent++;
 	}
+	/* Store the number of tuples inserted into the primary tupstore
+	 * This number is required to calculate the number of tuples in the
+	 * replica tupstore */
+	ss->num_tuples_primary = ss->num_tuples_sent;
 
 	if (NULL != ss->replica_tupstore)
 	{
@@ -642,6 +647,7 @@ await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)
 {
 	AsyncResponseResult *rsp;
 	List *results = NIL;
+	int num_tuples_primary = 0, num_tuples_replica = 0, num_tuples_total = 0;
 
 	sds->next_tuple = 0;
 
@@ -651,6 +657,8 @@ await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)
 		PGresult *res = async_response_result_get_pg_result(rsp);
 		ExecStatusType status = PQresultStatus(res);
 		bool report_error = true;
+		num_tuples_primary += ss->num_tuples_primary;
+		num_tuples_replica += ss->num_tuples_sent - ss->num_tuples_primary;
 
 		switch (status)
 		{
@@ -683,8 +691,26 @@ await_all_responses(DataNodeDispatchState *sds, AsyncRequestSet *reqset)
 		 * returned should greater than zero and be the same as the number of
 		 * tuples sent.  */
 		Assert(sds->stmt.do_nothing || ss->num_tuples_inserted > 0);
+
+		num_tuples_total += ss->num_tuples_inserted;
+
+		ereport(NOTICE,
+			(errmsg("inserted %d primary %d replica %d",
+				ss->num_tuples_inserted,
+				ss->num_tuples_primary,
+				ss->num_tuples_sent - ss->num_tuples_primary
+			))
+		);
+
 		ss->next_tuple = 0;
 	}
+	ereport(NOTICE,
+		(errmsg("cumulative %d primary %d replica %d",
+			num_tuples_total,
+			num_tuples_primary,
+			num_tuples_replica
+		))
+	);
 
 	return results;
 }

2.) SQL

DROP DATABASE IF EXISTS tupstore_test;
DROP DATABASE IF EXISTS node1;
DROP DATABASE IF EXISTS node2;
DROP DATABASE IF EXISTS node3;
CREATE DATABASE tupstore_test;
\c tupstore_test;
CREATE EXTENSION timescaledb;
CREATE DATABASE node1;
CREATE DATABASE node2;
CREATE DATABASE node3;

SELECT (add_data_node (name, host => 'localhost', DATABASE => name)).*
    FROM (VALUES ('node1'), ('node2'), ('node3')) v (name);

SET timescaledb.enable_distributed_insert_with_copy=false;
SET timescaledb.max_insert_batch_size=4;

DROP TABLE IF EXISTS twodim;
CREATE TABLE twodim (time timestamptz, "Color" int, temp float, PRIMARY KEY (time, "Color"));

-- Create a table with 3 chunks and replication, so batching will send
-- tuples to different nodes in one insert
SELECT * FROM create_hypertable('twodim', 'time', 'Color', 3, replication_factor => 2, data_nodes => ARRAY['node1','node2','node3']);

-- Insert rows which distribute over all chunks with one conflict
 INSERT INTO twodim VALUES
       ('2017-02-01 06:01', 1, 1.1),
       ('2017-02-01 08:01', 1, 1.2),
       ('2018-02-02 08:01', 2, 1.3),
       ('2019-02-01 09:11', 3, 2.1),
       ('2019-02-02 09:11', 3, 2.1),
       ('2019-02-02 10:01', 5, 1.2),
       ('2019-02-03 11:11', 6, 3.5),
       ('2019-02-04 08:21', 4, 6.6),
       ('2019-02-04 10:11', 7, 7.4),
       ('2019-02-04 12:11', 8, 2.1),
       ('2019-02-05 13:31', 8, 6.3),
       ('2019-02-06 02:11', 5, 1.8),
       ('2019-02-06 01:13', 7, 7.9),
       ('2019-02-06 19:24', 9, 5.9),
       ('2019-02-07 18:44', 5, 9.7),
       ('2019-02-07 18:44', 5, 9.7),
       ('2019-02-07 09:33', 7, 9.5),
       ('2019-02-08 08:54', 1, 7.3),
       ('2019-02-08 18:14', 4, 8.2),
       ('2019-02-09 19:23', 8, 9.1)
 ON CONFLICT DO NOTHING
 RETURNING tableoid = 'twodim'::regclass AS is_tableoid, time, temp, "Color";

3.) Output

NOTICE:  inserted 4 primary 2 replica 2
NOTICE:  cumulative 4 primary 2 replica 2
NOTICE:  inserted 4 primary 2 replica 2
NOTICE:  inserted 4 primary 2 replica 2
NOTICE:  cumulative 8 primary 4 replica 4
NOTICE:  inserted 4 primary 2 replica 2
NOTICE:  cumulative 4 primary 2 replica 2
NOTICE:  inserted 4 primary 0 replica 4
NOTICE:  cumulative 4 primary 0 replica 4
NOTICE:  inserted 4 primary 3 replica 1
NOTICE:  cumulative 4 primary 3 replica 1
NOTICE:  inserted 4 primary 3 replica 1
NOTICE:  cumulative 4 primary 3 replica 1
NOTICE:  inserted 3 primary 1 replica 3
NOTICE:  cumulative 3 primary 1 replica 3
NOTICE:  inserted 3 primary 3 replica 1
NOTICE:  cumulative 3 primary 3 replica 1
NOTICE:  inserted 1 primary 0 replica 1
NOTICE:  inserted 1 primary 0 replica 1
NOTICE:  inserted 2 primary 2 replica 0
NOTICE:  cumulative 4 primary 2 replica 2
 is_tableoid |          time          | temp | Color 
-------------+------------------------+------+-------
 t           | 2017-02-01 06:01:00+01 |  1.1 |     1
 t           | 2017-02-01 08:01:00+01 |  1.2 |     1
 t           | 2018-02-02 08:01:00+01 |  1.3 |     2
 t           | 2019-02-02 10:01:00+01 |  1.2 |     5
 t           | 2019-02-01 09:11:00+01 |  2.1 |     3
 t           | 2019-02-02 09:11:00+01 |  2.1 |     3
 t           | 2019-02-04 08:21:00+01 |  6.6 |     4
 t           | 2019-02-04 10:11:00+01 |  7.4 |     7
 t           | 2019-02-03 11:11:00+01 |  3.5 |     6
 t           | 2019-02-04 12:11:00+01 |  2.1 |     8
 t           | 2019-02-05 13:31:00+01 |  6.3 |     8
 t           | 2019-02-06 02:11:00+01 |  1.8 |     5
 t           | 2019-02-06 01:13:00+01 |  7.9 |     7
 t           | 2019-02-07 18:44:00+01 |  9.7 |     5
 t           | 2019-02-06 19:24:00+01 |  5.9 |     9
 t           | 2019-02-07 09:33:00+01 |  9.5 |     7
 t           | 2019-02-08 18:14:00+01 |  8.2 |     4
 t           | 2019-02-08 08:54:00+01 |  7.3 |     1
 t           | 2019-02-08 08:54:00+01 |  7.3 |     1
 t           | 2019-02-09 19:23:00+01 |  9.1 |     8
(20 Rows)

INSERT 0 20

4.) Discussion

As is visible in the notices raised, the batches generated distribute over primary tupstore and replica tupstore. => It is therefore impossible to get the desired result straight from pg_result

It is now possible to argue that this now turns into a combinatorics problem, as all primary and replica inserts must add up to the same count. This is true, but doesn't solve RETURNING *.

The statemachine's flow is so that returning tuples are processed after each finished batch. How many rows need to be returned is therefore a per-batch decision. This knowledge is not available ahead of time.

Consider:

NOTICE:  inserted 3 primary 1 replica 3
NOTICE:  cumulative 3 primary 1 replica 3

Batch done. Need to make decision about returning number of rows now. We have a cumulative of 3. Did we have a conflicting row on the primary or the replica? This question can not be answered. Therefore it is non trivial to decide how many rows to send to handle_returning now.

NOTICE:  inserted 3 primary 3 replica 1
NOTICE:  cumulative 3 primary 3 replica 1

The sibling of the batch above. Same problem applies.

5.) Corollary Notice how the order of rows returned is wrong (let's disregard the wrong count for now). Insert statement:

       ('2019-02-06 19:24', 9, 5.9),
       ('2019-02-07 18:44', 5, 9.7),
       ('2019-02-07 18:44', 5, 9.7),
       ('2019-02-07 09:33', 7, 9.5),

Output:

 t           | 2019-02-07 18:44:00+01 |  9.7 |     5
 t           | 2019-02-06 19:24:00+01 |  5.9 |     9
 t           | 2019-02-07 09:33:00+01 |  9.5 |     7
 t           | 2019-02-08 18:14:00+01 |  8.2 |     4

6.) Possible solutions: I'll quote data_node dispatch.c:

 119  *
 120  * - Tuples from both the primary and the replica tuple store are flushed with
 121  *   a RETURNING clause when such a clause is available. However, tuples from
 122  *   the replica store need not be returned, so using a separate prepared
 123  *   statement without RETURNING for the replica store would save bandwidth.

If we had a separate statement, knowledge about which rows conflict when is available to us and the problem goes away.

65278 avatar Sep 14 '22 22:09 65278

One further update on this: If we implement two separate insert statements for primary- and replica store, we run into a further problem: Both statements would still share one DataNodeState. While awaiting responses in await_all_responses, it'd be impossible to know if the statement went to the primary or the replica. This can be worked around with adding a per request data structure via async_request_attach_user_data, but this is the wrong way to go:

  • The code of send_batch_to_data_node becomes very hard to follow, as it needs to branch everywhere.
  • We break batch size (which is bad unto itself), but it also adds even more branching to the code

The saner way to go about this is to only ever send requests with tuples for the primary or the replica. For this to work, handle_read needs to be retooled.

65278 avatar Oct 12 '22 14:10 65278

In 2.13.0 we announced the deprecation of multi-node and it will be complete removed from the upcoming 2.14.0.

fabriziomello avatar Jan 30 '24 18:01 fabriziomello