[EPIC] Implement MERGE INTO statement
[EPIC] Implement MERGE INTO statement
Google doc: https://docs.google.com/document/d/1bVWg_mV7Ipte3IhdEQMffHaYz5sVjzwWHV9JsMrXgwE/edit?usp=sharing
Generally the implementation of the MERGE INTO statement can be composed into 2 parts.
- Generate RecordBatches in the desired format
- Correctly Upsert those RecordBatches to the Iceberg table
There is some kind of contract between these two parts that the RecordBatches are of the correct format. In particular the schema of the RecordBatches must include the following fields: __data_file_path, __source_exists.
Part 1:
The first part is solved by creating a LogicalPlan that provides us with the RecordBatches in the right format. The LogicalPlan can be illustrated as:
graph TD
target@{shape: procs}
pruned["target
target | table | columns"]
injected["__data_file_path | __target_exists | target | table | columns"]
source["source
source | table | columns"]
exists["__source_exists | source | table | columns"]
join((full outer join))
full["__data_file_path | __target_exists | target | table | columns | __source_exists | source"]
merge((MergeIntoExtension))
target -- prune data files --> pruned
pruned -- inject data-file-path & target-exists column --> injected
source -- inject source-exists column --> exists
injected --> join
exists --> join
join --> full
projection["SELECT CASE WHEN __source_exists THEN match(source) ELSE match(target) END, table, columns, __data_file_path, __source_exists"]
full -- Projection with CASE expressions for match clauses --> projection
projection --> merge
Part 2:
The second part is solved by implementing a custom PhysicalPlan which takes the RecordBatches in the desired format, removes any data from files that don't need to be overwritten, writes the RecordBatchs to pparquet files, and updates the Iceberg metadata accordingly. The functionality of the PhysicalPlan is illustrated as follows:
graph TD
full["__data_file_path | matched | table | columns | __source_exists"]
fork(("ForkNode"))
distinct["SELECT distinct __data_file_path WHERE __source_exists = true"]
semi_join(("left semi join on __data_file_path"))
overwrite[OverwriteList]
final["matched | table | columns"]
parquet["Write parquet files"]
append["Append parquet files to Iceberg table"]
remove["Remove overwritten files form Iceberg table"]
full --> fork
fork -- Identify files to overwrite --> distinct
distinct --> semi_join
fork --> semi_join
semi_join -- Remove __data_file_path column--> final
final --> parquet
parquet --> append
distinct --> overwrite
overwrite --> remove
Tasks:
Tests:
- [x] create unit tests that check MERGE INTO correctness
Refactor:
- [x] Create EmbucketQueryPlanner as QueryPlanner for Datafusion
External:
- [x] Add option to output __data_file_path column for iceberg-rust TableProvider
- [ ] Add "Overwrite" operation to Iceberg tables
LogicalPlan:
- [ ] Convert Ast Expression into Datafusion Expression
- [ ] Convert Ast MergeClause into Datafusion Expression
- [ ] Convert target, source into Datafusion LogicalPlans
- [ ] Add __target_exists & __sourc exists columns to LogicalPlans
- [ ] Create LogicalPlan for Full Outer Join
- [ ] Create CASE Expressions for MergeClauses
- [ ] Create Projection LogicalPlan with CASE Expressions
Planner:
- [x] Create EmucketExtensionPlanner as ExtensionPlanner
- [x] Extend EmbucketQueryPlanner with EmbucketExtensionPlanner
- [x] Implement planning MergeInto Extension
PhysicalPlan:
- [x] Create MergeIntoExec PhysicalPlan
- [ ] Create stream impl that filters out rows whoose files don't need to be overwritten
- [ ] Compute files that need to be overwritten
- [x] Create Projection that removes auxiliary columns
Optimization:
- [ ] Prune target table scan