Add non-blocking variant of create_distributed_table
DESCRIPTION: Add create_distributed_table_concurrently which distributes tables without blocking
This is a prototype of create_distributed_table_concurrently built on top of #6080.
We first convert the table to a Citus local table in a subtransaction, such that is has one shard. We then (tentatively) update the metadata to convert it to a distributed table and perform a shard split.
Due to the (not yet committed) metadata update, the replication slot creation needs to be pulled forward because it would otherwise block waiting for the write to finish, and the UDFs and decoder need to be more independent of metadata.
TODO:
- [ ] Add tests
- [ ] Make colocate_with := 'none' behaviour consistent with regular create_distributed_table, it always generates a pg_dist_colocation entry now. That is actually nice behaviour that we've wanted before. It might be nice to revert https://github.com/citusdata/citus/pull/5951 ).
- [ ] Ensure we are free of deadlocks when doing citus_add_local_table_to_metadata
- [ ] Better error message when the table is referenced by a foreign key
Open questions / potential improvements:
- [ ] Can we do more error checking before Citus local table conversion (without triggering deadlocks)
- [ ] Can we do the split without Citus local table conversion?
- [ ] Can we improve the code structure?
Codecov Report
Merging #6087 (b77ecb0) into main (be06d65) will increase coverage by
0.01%. The diff coverage is95.29%.
@@ Coverage Diff @@
## main #6087 +/- ##
==========================================
+ Coverage 92.95% 92.96% +0.01%
==========================================
Files 253 254 +1
Lines 52917 53166 +249
==========================================
+ Hits 49188 49426 +238
- Misses 3729 3740 +11
- Added global lock to prevent concurrent nonblocking shard split operations because we share memory and replication slots with nonblocking split ops,
- Refactored SplitShard method to accept distribution column, colocated shard interval list, and colocation id because we need those during create distributed table concurrently,
- Added table's partitions' shard intervals into colocated shard interval list if we are executing create_distributed_table operation,
- Added isolation tests, failure tests.
- Create colocation entry with colocate_with => 'none', #6227,
- Do not create colocation entry for reference tables in EnsureReferenceTablesOnAllNodes, #6224,
Per separate discussion, we should refactor the lock that handles the colocate_with :='default' case where two concurrent create_distributed_table calls try to create the same co-location group because we started blocking colocate_with := 'none' creations as of #6227, since colocate_with := 'none' results in an insert into pg_dist_colocation with a RowExclusiveLock (see InsertColocationGroupLocally) which conflicts with the current ExclusiveLock.
The goal of the lock is to serialize two create_distributed_table calls if one of them creates a co-location group, to ensure the other caller will get the same co-location group if it has the same properties (like shard count), which is what would happen if they ran serially. However, we need not block colocate_with := 'none' or other calls to achieve that, since those calls are independent from each other and from callers that use the default group.
In create_distributed_table_concurrently we also need to make sure we do not block concurrent create_distributed_table calls and vice-versa, since that could be disruptive.
In particular, we should:
- Only take a lock for the
colocate_with :='default'case to not block unrelatedcolocate_with := 'none'orcolocate_with := '<table>'commands that have no such concurrency issues - Use an advisory lock because we do not really need to block the whole
pg_dist_colocationtable. We could also useShareUpdateExclusiveLockon the table to avoid blocking all writes, but decided an advisory lock is more semantically explicit, since we're not truly concerned with other operations onpg_dist_colocationlike DDL - Take the
colocate_with := 'default'lock in thecreate_distributed_table_concurrentlycase before reading from pg_dist_colocation and hold it if we will create a default co-location group duringNonBlockingShardSplitto serialize concurrentcreate_distributed_tablethat uses default co-location group. - Keep the the RowExclusive lock on
pg_dist_colocationinInsertColocationGroupLocallyandDeleteColocationGroupLocallybeyond the end of the function by passingNoLocktotable_close, for general transactional correctness. - Consider adding a global lock at the start of
CreateDistributedTableConcurrentlyto avoid concurrentcreate_distributed_table_concurrentlycalls entering the first few stages.
Apart from last round of comments, this looks good to me. I won't be able to approve since it's my PR.
Used pgbench again to do some validation.
initial set up:
CREATE TABLE test (x int, y int, z bigserial);
CREATE OR REPLACE FUNCTION do_insert(p_x int, p_y int)
RETURNS void
AS $$
DECLARE
counter int;
BEGIN
for counter in 1..100 loop
begin
insert into test (x, y) values (p_x, p_y);
return;
exception when others then
perform pg_sleep(0.1);
end;
end loop;
END;
$$ LANGUAGE plpgsql;
pgbench script:
\set x random(1, 10000)
\set y random(1, 10000)
select do_insert(:x,:y);
let pgbench do 200,000 inserts (can also run against a worker):
pgbench -n -f ../citus-create/insert.sql -t 5000 -j 8 -c 40 -P 10
validate that all inserts are accounted for:
select count(*) from test;
┌────────┐
│ count │
├────────┤
│ 200000 │
└────────┘
(1 row)
validate all inserts when into the right location:
select count(*) from test, pg_dist_shard where pg_typeof(test)::text = shard_name('test', shardid) and worker_hash(x) >= shardminvalue::int and worker_hash(x) <= shardmaxvalue::int;
┌────────┐
│ count │
├────────┤
│ 200000 │
└────────┘
(1 row)