seafowl
seafowl copied to clipboard
OOM in FULL OUTER JOIN of 2x 40k row tables
WITH old AS (SELECT * FROM socrata.dataset_history WHERE sg_image_tag = '20221024-120115'),
new AS (SELECT * FROM socrata.dataset_history WHERE sg_image_tag = '20221031-000137')
SELECT
COALESCE(old.domain, new.domain) AS domain,
COALESCE(old.id, new.id) AS id,
COALESCE(old.name, new.name) AS name,
COALESCE(old.description, new.description) AS description,
COALESCE(old.created_at, new.created_at) AS created_at,
COALESCE(old.updated_at, new.updated_at) AS updated_at,
old.id IS NULL AS is_added -- TRUE if added, FALSE if deleted
FROM old FULL OUTER JOIN new
ON old.domain = new.domain AND old.id = new.id
-- Only include added/deleted datasets
WHERE old.id IS NULL OR new.id IS NULL
ORDER BY domain, name, is_added
(3.6M / 2.7GB Socrata history dataset, the old
/new
CTEs are supposed to narrow it down to 2x40k row tables)
This works fine on a multi-core machine, meaning it's due to single-core plan differences:
1 CORE:
SortExec: [domain@0 ASC NULLS LAST]
ProjectionExec: expr=[coalesce(old.domain,new.domain)@0 as domain, SUM(CASE WHEN old.id IS NULL THEN Int64(1) ELSE Int64(0) END)@1 as added, SUM(CASE WHEN new.id IS NULL THEN Int64(1) ELSE Int64(0) END)@2 as deleted]
AggregateExec: mode=Final, gby=[coalesce(old.domain,new.domain)@0 as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]
CoalescePartitionsExec
AggregateExec: mode=Partial, gby=[coalesce(domain@0, domain@2) as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=4096
FilterExec: id@1 IS NULL OR id@3 IS NULL
CoalesceBatchesExec: target_batch_size=4096
HashJoinExec: mode=CollectLeft, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })]
ProjectionExec: expr=[domain@0 as domain, id@1 as id]
CoalesceBatchesExec: target_batch_size=4096
FilterExec: sg_image_tag@2 = 20220814-000122
ParquetExec: limit=None, partitions=[...]
ProjectionExec: expr=[domain@0 as domain, id@1 as id]
CoalesceBatchesExec: target_batch_size=4096
FilterExec: sg_image_tag@2 = 20220815-000129
ParquetExec: limit=None, partitions=[...]
MULTICORE:
SortExec: [domain@0 ASC NULLS LAST]
CoalescePartitionsExec
ProjectionExec: expr=[coalesce(old.domain,new.domain)@0 as domain, SUM(CASE WHEN old.id IS NULL THEN Int64(1) ELSE Int64(0) END)@1 as added, SUM(CASE WHEN new.id IS NULL THEN Int64(1) ELSE Int64(0) END)@2 as deleted]
AggregateExec: mode=FinalPartitioned, gby=[coalesce(old.domain,new.domain)@0 as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "coalesce(old.domain,new.domain)", index: 0 }], 4)
AggregateExec: mode=Partial, gby=[coalesce(domain@0, domain@2) as coalesce(old.domain,new.domain)], aggr=[SUM(CASE WHEN #old.id IS NULL THEN Int64(1) ELSE Int64(0) END), SUM(CASE WHEN #new.id IS NULL THEN Int64(1) ELSE Int64(0) END)]
CoalesceBatchesExec: target_batch_size=4096
FilterExec: id@1 IS NULL OR id@3 IS NULL
CoalesceBatchesExec: target_batch_size=4096
HashJoinExec: mode=Partitioned, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "id", index: 1 }], 4)
ProjectionExec: expr=[domain@0 as domain, id@1 as id]
CoalesceBatchesExec: target_batch_size=4096
FilterExec: sg_image_tag@2 = 20220814-000122
ParquetExec: limit=None, partitions=[...]
CoalesceBatchesExec: target_batch_size=4096
RepartitionExec: partitioning=Hash([Column { name: "id", index: 1 }], 4)
ProjectionExec: expr=[domain@0 as domain, id@1 as id]
CoalesceBatchesExec: target_batch_size=4096
FilterExec: sg_image_tag@2 = 20220815-000129
ParquetExec: limit=None, partitions=[...]
in particular:
1 core:
HashJoinExec: mode=CollectLeft, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })], metrics=[output_rows=413307, output_batches=16, input_batches=16, input_rows=413307, join_time=8.42958ms]
multicore:
HashJoinExec: mode=Partitioned, join_type=Full, on=[(Column { name: "id", index: 1 }, Column { name: "id", index: 1 })], metrics=[output_rows=41337, output_batches=14, input_rows=41337, input_batches=14, join_time=3.456564ms]