embucket-labs icon indicating copy to clipboard operation
embucket-labs copied to clipboard

[EPIC] Implement MERGE INTO statement

Open JanKaul opened this issue 6 months ago • 0 comments

[EPIC] Implement MERGE INTO statement

Google doc: https://docs.google.com/document/d/1bVWg_mV7Ipte3IhdEQMffHaYz5sVjzwWHV9JsMrXgwE/edit?usp=sharing

Generally the implementation of the MERGE INTO statement can be composed into 2 parts.

  1. Generate RecordBatches in the desired format
  2. Correctly Upsert those RecordBatches to the Iceberg table

There is some kind of contract between these two parts that the RecordBatches are of the correct format. In particular the schema of the RecordBatches must include the following fields: __data_file_path, __source_exists.

Part 1:

The first part is solved by creating a LogicalPlan that provides us with the RecordBatches in the right format. The LogicalPlan can be illustrated as:

graph TD
  target@{shape: procs}
  pruned["target
  target | table | columns"]
  injected["__data_file_path | __target_exists | target | table | columns"]
  source["source
  source | table | columns"]
  exists["__source_exists | source | table | columns"]
  join((full outer join))
  full["__data_file_path | __target_exists | target | table | columns | __source_exists | source"]
  merge((MergeIntoExtension))

  target -- prune data files --> pruned
  pruned -- inject data-file-path & target-exists column --> injected
  source -- inject source-exists column --> exists
  injected --> join
  exists --> join
  join --> full
  projection["SELECT CASE WHEN __source_exists THEN match(source) ELSE match(target) END, table, columns, __data_file_path, __source_exists"]
  full -- Projection with CASE expressions for match clauses --> projection
  projection --> merge
  

Part 2:

The second part is solved by implementing a custom PhysicalPlan which takes the RecordBatches in the desired format, removes any data from files that don't need to be overwritten, writes the RecordBatchs to pparquet files, and updates the Iceberg metadata accordingly. The functionality of the PhysicalPlan is illustrated as follows:

graph TD
  full["__data_file_path | matched | table | columns | __source_exists"]
  fork(("ForkNode"))
  distinct["SELECT distinct __data_file_path WHERE __source_exists = true"]
  semi_join(("left semi join on __data_file_path"))
  overwrite[OverwriteList]
  final["matched | table | columns"]
  parquet["Write parquet files"]
  append["Append parquet files to Iceberg table"]
  remove["Remove overwritten files form Iceberg table"]
  full --> fork
  fork -- Identify files to overwrite --> distinct
  distinct --> semi_join
  fork --> semi_join
  semi_join -- Remove __data_file_path column--> final
  final --> parquet
  parquet --> append
  distinct --> overwrite
  overwrite --> remove

Tasks:

Tests:

  • [x] create unit tests that check MERGE INTO correctness

Refactor:

  • [x] Create EmbucketQueryPlanner as QueryPlanner for Datafusion

External:

  • [x] Add option to output __data_file_path column for iceberg-rust TableProvider
  • [ ] Add "Overwrite" operation to Iceberg tables

LogicalPlan:

  • [ ] Convert Ast Expression into Datafusion Expression
  • [ ] Convert Ast MergeClause into Datafusion Expression
  • [ ] Convert target, source into Datafusion LogicalPlans
  • [ ] Add __target_exists & __sourc exists columns to LogicalPlans
  • [ ] Create LogicalPlan for Full Outer Join
  • [ ] Create CASE Expressions for MergeClauses
  • [ ] Create Projection LogicalPlan with CASE Expressions

Planner:

  • [x] Create EmucketExtensionPlanner as ExtensionPlanner
  • [x] Extend EmbucketQueryPlanner with EmbucketExtensionPlanner
  • [x] Implement planning MergeInto Extension

PhysicalPlan:

  • [x] Create MergeIntoExec PhysicalPlan
    • [ ] Create stream impl that filters out rows whoose files don't need to be overwritten
    • [ ] Compute files that need to be overwritten
    • [x] Create Projection that removes auxiliary columns

Optimization:

  • [ ] Prune target table scan

JanKaul avatar May 28 '25 16:05 JanKaul