Row filter is not applied during initial sync
There is a Replication between two databases(db1, db2) on the same PostgreSQL instance.
Issue: The row filter is not being applied properly while replicating data from one table to another.
Replication Setup
Publisher Side:
device_bug_reportis a pg_partition table present indb1(database) that will be used as our publisher table.- This table is already loaded with data and is constantly receiving DML operations.
Query
CREATE TABLE public.device_bug_report (
device_id integer NOT NULL,
cust_id integer NOT NULL,
prod_id integer NOT NULL,
manufactured_on timestamp without time zone NOT NULL,
sold_on timestamp without time zone NOT NULL,
bug text NOT NULL,
report_time timestamp without time zone NOT NULL
) PARTITION BY RANGE (manufactured_on);
CREATE TABLE public.device_bug_report_1 PARTITION OF public.device_bug_report
FOR VALUES FROM ('2020-01-01 00:00:00') TO ('2020-04-01 00:00:00');
CREATE TABLE public.device_bug_report_2 PARTITION OF public.device_bug_report
FOR VALUES FROM ('2020-04-01 00:00:00') TO ('2020-07-01 00:00:00');
CREATE TABLE public.device_bug_report_3 PARTITION OF public.device_bug_report
FOR VALUES FROM ('2020-07-01 00:00:00') TO ('2020-10-01 00:00:00');
CREATE TABLE public.device_bug_report_4 PARTITION OF public.device_bug_report
FOR VALUES FROM ('2020-10-01 00:00:00') TO ('2021-01-01 00:00:00');
ALTER TABLE public.device_bug_report ADD CONSTRAINT device_bug_report_pkey
PRIMARY KEY (device_id, manufactured_on, sold_on, report_time);
Table Structure
Table "public.device_bug_report"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
-----------------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
device_id | integer | | not null | | plain | |
cust_id | integer | | not null | | plain | |
prod_id | integer | | not null | | plain | |
manufactured_on | timestamp without time zone | | not null | | plain | |
sold_on | timestamp without time zone | | not null | | plain | |
bug | text | | not null | | extended | |
report_time | timestamp without time zone | | not null | | plain | |
Partition key: RANGE (manufactured_on)
Indexes:
"device_bug_report_pkey" PRIMARY KEY, btree (device_id, manufactured_on, sold_on, report_time)
Partitions: device_bug_report_1 FOR VALUES FROM ('2020-01-01 00:00:00') TO ('2020-04-01 00:00:00'),
device_bug_report_2 FOR VALUES FROM ('2020-04-01 00:00:00') TO ('2020-07-01 00:00:00'),
device_bug_report_3 FOR VALUES FROM ('2020-07-01 00:00:00') TO ('2020-10-01 00:00:00'),
device_bug_report_4 FOR VALUES FROM ('2020-10-01 00:00:00') TO ('2021-01-01 00:00:00')
Creating Replication Set
SELECT pglogical.create_replication_set(
'device_filtered_pub',
replicate_insert := true,
replicate_update := true,
replicate_delete := true,
replicate_truncate := true
);
Adding Partitions to Replication Set with Row Filter
- Each partition is added separately with a row filter to replicate only specific
device_idvalues. As a result, replication occurs directly from Publisher partitions to Subscriber partitions.
SELECT pglogical.replication_set_add_table(
set_name := 'device_filtered_pub',
relation := 'public.device_bug_report_1',
row_filter := $$ device_id >= 1 AND device_id <= 400 $$
);
SELECT pglogical.replication_set_add_table(
set_name := 'device_filtered_pub',
relation := 'public.device_bug_report_2',
row_filter := $$ device_id >= 1 AND device_id <= 400 $$
);
SELECT pglogical.replication_set_add_table(
set_name := 'device_filtered_pub',
relation := 'public.device_bug_report_3',
row_filter := $$ device_id >= 1 AND device_id <= 400 $$
);
SELECT pglogical.replication_set_add_table(
set_name := 'device_filtered_pub',
relation := 'public.device_bug_report_4',
row_filter := $$ device_id >= 1 AND device_id <= 400 $$
);
Subscriber Side:
- The subscriber table (
device_bug_report) indb2has the same structure as the publisher table indb1. - The subscriber table is intended to receive only filtered rows based on the replication filter.
- Each sub-partition in the subscriber table will directly receive filtered data from its corresponding sub-partition in the publisher table.
Table Structure
Table "public.device_bug_report"
Column | Type | Collation | Nullable | Default | Storage | Stats target | Description
-----------------+-----------------------------+-----------+----------+---------+----------+--------------+-------------
device_id | integer | | not null | | plain | |
cust_id | integer | | not null | | plain | |
prod_id | integer | | not null | | plain | |
manufactured_on | timestamp without time zone | | not null | | plain | |
sold_on | timestamp without time zone | | not null | | plain | |
bug | text | | not null | | extended | |
report_time | timestamp without time zone | | not null | | plain | |
Partition key: RANGE (manufactured_on)
Indexes:
"device_bug_report_pkey" PRIMARY KEY, btree (device_id, manufactured_on, sold_on, report_time)
Partitions: device_bug_report_1 FOR VALUES FROM ('2020-01-01 00:00:00') TO ('2020-04-01 00:00:00'),
device_bug_report_2 FOR VALUES FROM ('2020-04-01 00:00:00') TO ('2020-07-01 00:00:00'),
device_bug_report_3 FOR VALUES FROM ('2020-07-01 00:00:00') TO ('2020-10-01 00:00:00'),
device_bug_report_4 FOR VALUES FROM ('2020-10-01 00:00:00') TO ('2021-01-01 00:00:00')
Creating the Subscription
- The subscriber (
db2) creates a subscription to the replication set (device_filtered_pub) from the publisher (db1)
SELECT pglogical.create_subscription(
subscription_name := 'device_filtered_sub',
provider_dsn := 'dbname=db1 host=localhost port=5432 user=admin',
replication_sets := '{"device_filtered_pub"}'
);
Version Details:
- PostgreSQL Version: 11.4
- pglogical Version: 2.2.1
Issue Observed
- After creating the subscription, the initial sync completed successfully and new rows were being replicated correctly with the row filter.
- However, an issue was observed in one of the partitions (
device_bug_report_3) on the subscriber table:- When running
COUNT(*)with the row filter (device_id >= 1 AND device_id <= 400) andCOUNT(*)without the row filter on the partition table, the row counts did not match.
- When running
Query
-- Total rows count present in that partition
SELECT COUNT(*) FROM device_bug_report_3;
count
-------
216
-- Filtered rows count present in that partition
SELECT COUNT(*) FROM device_bug_report_3 WHERE device_id >= 1 AND device_id <= 400;
count
-------
96
Analysis on Logs and Table data
- Based on the results of the SELECT query, the logs were analyzed, and it was found that real-time DMLs performed on the publisher table outside the row filter range were not replicated to the subscriber table, So there is no issue with final sync.
- This leads to the conclusion that the excess rows must have come from the initial sync, where all rows from the
device_bug_report_3's corresponding partition of the publisher table were replicated without applying the row filter.
Example
The device_bug_report_3partition of thedevice_bug_reporttable (publisher table) indb1holds a device_id rangefrom 1 to 1000.- The goal was to move the data range
from 1 to 350to thedevice_bug_report_3partition of thedevice_bug_reporttable (subscriber table) indb2using the row filter. - However, all the existing data in the range 1 to 1000 were moved from the publisher to the subscriber without applying the row filter.
- Meanwhile, ongoing DML operations for the range 400 to 1000 were correctly filtered and not replicated to the subscriber.
This issue is not recurring (rarely or not happening again) and has occurred only for this one partition table, others are absolutely fine.
Observed a bug in the cache invalidation callback repset_relcache_invalidate_callback() setting row_filter to NULL during the initial COPY command execution at Publisher node. As a result, all data from the publisher table is copied to the subscriber table without applying the row_filter, even when one exists for that table.
Problem:
- After creating the subscriber, the
COPYcommand is executed to fetch row-filtered data from the publisher tabledevice_bug_report_3as part of the initial sync. This internally usespglogical_table_data_filtered()to apply the row filter on the table rows
# COPY CMD
COPY (
SELECT "device_id","cust_id","prod_id","manufactured_on","sold_on","bug","report_time"
FROM pglogical.table_data_filtered(NULL::"public"."device_bug_report_3", '"public"."device_bug_report_3"'::regclass, ARRAY['device_filtered_pub'])
) TO STDOUT;
-
In
pglogical_table_data_filtered(),At line 2150,
get_table_replication_info()registersrepset_relcache_invalidate_callback(), retrieves theRepSetTableHashentry for the current relation (including itsrow_filterfield), and stores it intableinfo.At line 2154, when the program reaches the Prepare executor functions
(create_estate_for_relation(), prepare_per_tuple_econtext()), one of these functions triggers a cache invalidation for the current relation, setting theRepSetTableHashentry's isvalid tofalseandrow_filtertoNIL(freed). Note: Cache invalidation happens here rarely, not always.At line 2158, when
tableinfo->row_filteris accessed, it isNULLdue to the earlier cache invalidation, resulting in norow_filternode being created. As a result, no filter is applied and all rows are passed.
File: pglogical_functions.c
2055: Datum
2056: pglogical_table_data_filtered(PG_FUNCTION_ARGS)
2057: {
...
2150: tableinfo = get_table_replication_info(node->node->id, rel,
2151: replication_sets);
2152:
2153: /* Prepare executor. */
2154: estate = create_estate_for_relation(rel, false);
2155: econtext = prepare_per_tuple_econtext(estate, reltupdesc);
2156:
2157: /* Prepare the row filter expression. */
2158: foreach (lc, tableinfo->row_filter)
2159: {
2160: Node *row_filter = (Node *) lfirst(lc);
2161: ExprState *exprstate = pglogical_prepare_row_filter(row_filter);
2162:
2163: row_filter_list = lappend(row_filter_list, exprstate);
2164: }
...
}
-
In
repset_relcache_invalidate_callback(),At line 212 and 224, the program sets the
isvalidfield of theRepSetTableHashentry tofalse, and then continues to free andNULLSthe entry'satt_listandrow_filterfield.This should not happen. It should only set
isvalidtofalse.
File: pglogical_repset.c
196: static void
197: repset_relcache_invalidate_callback(Datum arg, Oid reloid)
198: {
...
210: while ((entry = hash_seq_search(&status)) != NULL)
211: {
212: entry->isvalid = false;
213: if (entry->att_list)
214: pfree(entry->att_list);
215: entry->att_list = NULL;
216: if (list_length(entry->row_filter))
217: list_free_deep(entry->row_filter);
218: entry->row_filter = NIL;
219: }
220: }
221: else if ((entry = hash_search(RepSetTableHash, &reloid,
222: HASH_FIND, NULL)) != NULL)
223: {
224: entry->isvalid = false;
225: if (entry->att_list)
226: pfree(entry->att_list);
227: entry->att_list = NULL;
228: if (list_length(entry->row_filter))
229: list_free_deep(entry->row_filter);
230: entry->row_filter = NIL;
231: }
232: }