citus icon indicating copy to clipboard operation
citus copied to clipboard

Add non-blocking variant of create_distributed_table

Open marcocitus opened this issue 3 years ago • 1 comments

DESCRIPTION: Add create_distributed_table_concurrently which distributes tables without blocking

This is a prototype of create_distributed_table_concurrently built on top of #6080.

We first convert the table to a Citus local table in a subtransaction, such that is has one shard. We then (tentatively) update the metadata to convert it to a distributed table and perform a shard split.

Due to the (not yet committed) metadata update, the replication slot creation needs to be pulled forward because it would otherwise block waiting for the write to finish, and the UDFs and decoder need to be more independent of metadata.

TODO:

  • [ ] Add tests
  • [ ] Make colocate_with := 'none' behaviour consistent with regular create_distributed_table, it always generates a pg_dist_colocation entry now. That is actually nice behaviour that we've wanted before. It might be nice to revert https://github.com/citusdata/citus/pull/5951 ).
  • [ ] Ensure we are free of deadlocks when doing citus_add_local_table_to_metadata
  • [ ] Better error message when the table is referenced by a foreign key

Open questions / potential improvements:

  • [ ] Can we do more error checking before Citus local table conversion (without triggering deadlocks)
  • [ ] Can we do the split without Citus local table conversion?
  • [ ] Can we improve the code structure?

marcocitus avatar Jul 26 '22 17:07 marcocitus

Codecov Report

Merging #6087 (b77ecb0) into main (be06d65) will increase coverage by 0.01%. The diff coverage is 95.29%.

@@            Coverage Diff             @@
##             main    #6087      +/-   ##
==========================================
+ Coverage   92.95%   92.96%   +0.01%     
==========================================
  Files         253      254       +1     
  Lines       52917    53166     +249     
==========================================
+ Hits        49188    49426     +238     
- Misses       3729     3740      +11     

codecov[bot] avatar Aug 09 '22 18:08 codecov[bot]

  • Added global lock to prevent concurrent nonblocking shard split operations because we share memory and replication slots with nonblocking split ops,
  • Refactored SplitShard method to accept distribution column, colocated shard interval list, and colocation id because we need those during create distributed table concurrently,
  • Added table's partitions' shard intervals into colocated shard interval list if we are executing create_distributed_table operation,
  • Added isolation tests, failure tests.

aykut-bozkurt avatar Aug 19 '22 20:08 aykut-bozkurt

  • Create colocation entry with colocate_with => 'none', #6227,
  • Do not create colocation entry for reference tables in EnsureReferenceTablesOnAllNodes, #6224,

aykut-bozkurt avatar Aug 23 '22 14:08 aykut-bozkurt

Per separate discussion, we should refactor the lock that handles the colocate_with :='default' case where two concurrent create_distributed_table calls try to create the same co-location group because we started blocking colocate_with := 'none' creations as of #6227, since colocate_with := 'none' results in an insert into pg_dist_colocation with a RowExclusiveLock (see InsertColocationGroupLocally) which conflicts with the current ExclusiveLock.

The goal of the lock is to serialize two create_distributed_table calls if one of them creates a co-location group, to ensure the other caller will get the same co-location group if it has the same properties (like shard count), which is what would happen if they ran serially. However, we need not block colocate_with := 'none' or other calls to achieve that, since those calls are independent from each other and from callers that use the default group.

In create_distributed_table_concurrently we also need to make sure we do not block concurrent create_distributed_table calls and vice-versa, since that could be disruptive.

In particular, we should:

  • Only take a lock for the colocate_with :='default' case to not block unrelated colocate_with := 'none' or colocate_with := '<table>' commands that have no such concurrency issues
  • Use an advisory lock because we do not really need to block the whole pg_dist_colocation table. We could also use ShareUpdateExclusiveLock on the table to avoid blocking all writes, but decided an advisory lock is more semantically explicit, since we're not truly concerned with other operations on pg_dist_colocation like DDL
  • Take the colocate_with := 'default' lock in the create_distributed_table_concurrently case before reading from pg_dist_colocation and hold it if we will create a default co-location group during NonBlockingShardSplit to serialize concurrent create_distributed_table that uses default co-location group.
  • Keep the the RowExclusive lock on pg_dist_colocation in InsertColocationGroupLocally and DeleteColocationGroupLocally beyond the end of the function by passing NoLock to table_close, for general transactional correctness.
  • Consider adding a global lock at the start of CreateDistributedTableConcurrently to avoid concurrent create_distributed_table_concurrently calls entering the first few stages.

marcocitus avatar Aug 25 '22 15:08 marcocitus

Apart from last round of comments, this looks good to me. I won't be able to approve since it's my PR.

marcocitus avatar Aug 29 '22 12:08 marcocitus

Used pgbench again to do some validation.

initial set up:

CREATE TABLE test (x int, y int, z bigserial);

CREATE OR REPLACE FUNCTION do_insert(p_x int, p_y int)
RETURNS void
AS $$
DECLARE
        counter int;
BEGIN
        for counter in 1..100 loop
                begin
                        insert into test (x, y) values (p_x, p_y);
                        return;
                exception when others then
                        perform pg_sleep(0.1);
                end;
        end loop;
END;
$$ LANGUAGE plpgsql;

pgbench script:

\set x random(1, 10000)
\set y random(1, 10000)
select do_insert(:x,:y);

let pgbench do 200,000 inserts (can also run against a worker):

pgbench -n -f ../citus-create/insert.sql -t 5000 -j 8 -c 40 -P 10 

validate that all inserts are accounted for:

select count(*) from test;
┌────────┐
│ count  │
├────────┤
│ 200000 │
└────────┘
(1 row)

validate all inserts when into the right location:

select count(*) from test, pg_dist_shard where pg_typeof(test)::text = shard_name('test', shardid) and worker_hash(x) >= shardminvalue::int and worker_hash(x) <= shardmaxvalue::int;
┌────────┐
│ count  │
├────────┤
│ 200000 │
└────────┘
(1 row)

marcocitus avatar Aug 29 '22 13:08 marcocitus