Distributed and local table joins might end up with planning errors when distributed is choosen
create table local(id int);
create table distributed(id int);
select create_distributed_table('distributed','id');
set citus.local_table_join_policy TO 'prefer-distributed';
SELECT COUNT(*)
FROM
local
JOIN
distributed USING (id)
JOIN (SELECT id, NULL, NULL FROM distributed) foo USING (id);
DEBUG: Wrapping relation "distributed" to a subquery
DEBUG: generating subplan 54_1 for subquery SELECT id FROM public.distributed WHERE true
DEBUG: Plan 54 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((public.local JOIN (SELECT distributed_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('54_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) distributed_1) distributed USING (id)) JOIN (SELECT distributed_1.id, NULL::text AS "?column?", NULL::text AS "?column?" FROM public.distributed distributed_1) foo(id, "?column?", "?column?_1") USING (id))
DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan 54_1 for subquery SELECT id FROM public.local WHERE true
DEBUG: Plan 54 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT local_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('54_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) local_1) local JOIN (SELECT distributed_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('54_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) distributed_1) distributed USING (id)) JOIN (SELECT distributed_1.id, NULL::text AS "?column?", NULL::text AS "?column?" FROM public.distributed distributed_1) foo(id, "?column?", "?column?_1") USING (id))
The problem is that we force to recursively plan distributed table, but then the second call to CreateDistributedPlan finds a join tree which is NOT ready yet, and does one more recursive planning due to the local table in the join tree. Thus, we end up with the same subPlanId, where the second one is overridden by the first one.
In the above example, we used set citus.local_table_join_policy TO 'prefer-distributed'; which is a niche area. However, we can repro the same with "auto" mode, and primary keys on the distributed tables:
set citus.local_table_join_policy TO 'auto';
create table local(id int);
create table distributed(id int PRIMARY KEY);
select create_distributed_table('distributed','id');
SELECT COUNT(*) FROM local JOIN distributed d1 USING (id) JOIN (SELECT id, NULL, NULL FROM distributed) foo USING (id) WHERE d1.id = 15;
DEBUG: Wrapping relation "distributed" "d1" to a subquery
DEBUG: generating subplan 58_1 for subquery SELECT id FROM public.distributed d1 WHERE (id OPERATOR(pg_catalog.=) 15)
DEBUG: Plan 58 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((public.local JOIN (SELECT d1_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('58_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) d1_1) d1 USING (id)) JOIN (SELECT distributed.id, NULL::text AS "?column?", NULL::text AS "?column?" FROM public.distributed) foo(id, "?column?", "?column?_1") USING (id)) WHERE (d1.id OPERATOR(pg_catalog.=) 15)
DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan 58_1 for subquery SELECT id FROM public.local WHERE (id OPERATOR(pg_catalog.=) 15)
DEBUG: Plan 58 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT local_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('58_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) local_1) local JOIN (SELECT d1_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('58_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) d1_1) d1 USING (id)) JOIN (SELECT distributed.id, NULL::text AS "?column?", NULL::text AS "?column?" FROM public.distributed) foo(id, "?column?", "?column?_1") USING (id)) WHERE (d1.id OPERATOR(pg_catalog.=) 15)
We still see reuse of the same subplan id for the first case, but then error out in a later stage:
SELECT COUNT(*) FROM local JOIN distributed d1 USING (id) JOIN (SELECT id, NULL, NULL FROM distributed) foo USING (id) WHERE d1.id = 15;
DEBUG: Wrapping relation "distributed" "d1" to a subquery
DEBUG: generating subplan 10_1 for subquery SELECT id FROM public.distributed d1 WHERE (id OPERATOR(pg_catalog.=) 15)
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM ((public.local JOIN (SELECT d1_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) d1_1) d1 USING (id)) JOIN (SELECT distributed.id, NULL::text AS "?column?", NULL::text AS "?column?" FROM public.distributed) foo(id, "?column?", "?column?_1") USING (id)) WHERE (d1.id OPERATOR(pg_catalog.=) 15)
DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan 10_1 for subquery SELECT id FROM public.local WHERE (id OPERATOR(pg_catalog.=) 15)
DEBUG: Plan 10 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT local_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) local_1) local JOIN (SELECT d1_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('10_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) d1_1) d1 USING (id)) JOIN (SELECT distributed.id, NULL::text AS "?column?", NULL::text AS "?column?" FROM public.distributed) foo(id, "?column?", "?column?_1") USING (id)) WHERE (d1.id OPERATOR(pg_catalog.=) 15)
ERROR: recursive complex joins are only supported when all distributed tables are co-located and joined on their distribution column
The plan does not use the same subplan id in the second case,
set citus.local_table_join_policy TO 'auto';
SET
SELECT COUNT(*) FROM local JOIN distributed d1 USING (id) JOIN (SELECT id, NULL, NULL FROM distributed) foo USING (id) WHERE d1.id = 15;
DEBUG: Wrapping relation "local" to a subquery
DEBUG: generating subplan 14_1 for subquery SELECT id FROM public.local WHERE (id OPERATOR(pg_catalog.=) 15)
DEBUG: Plan 14 query after replacing subqueries and CTEs: SELECT count(*) AS count FROM (((SELECT local_1.id FROM (SELECT intermediate_result.id FROM read_intermediate_result('14_1'::text, 'binary'::citus_copy_format) intermediate_result(id integer)) local_1) local JOIN public.distributed d1 USING (id)) JOIN (SELECT distributed.id, NULL::text AS "?column?", NULL::text AS "?column?" FROM public.distributed) foo(id, "?column?", "?column?_1") USING (id)) WHERE (d1.id OPERATOR(pg_catalog.=) 15)