Implement `insert_into` for `IcebergTableProvider`
Is your feature request related to a problem or challenge?
As a part of #1382 , we need to implement insert_into for IcebergTableProvider to support INSERT INTO query in datafusion:
insert into t value (1, 'a');
Physical Plans
Within insert_into, we will need to add a few nodes / Datafusion physical plans to complete the write process. And the entire write process can be described by the flowchart below:
flowchart TD
A(["Input Node"]) --> F["Project Node"]
F --> B["Repartition Node"]
B --> C["Sort Node"]
C --> D["Writer Node"]
D --> E["Commit Node"]
- Input Node: Input physical plan that represents the input data
- Project Node: Caculate partition value
- Repartition Node: Decide the partitioning mode for the best parallelism
- Sort Node: Sort the input data
- Writer Node: Spawn Iceberg writers and write the input data
- Commit Node: Commit the data written using Iceberg Tx API
Writer Extension
Except writers mentioned in the writer path of #1382 , there are other writers like RollingFileWriter can be useful to help split incoming data into multiple files
Tasks List
- [x] #1541
- [x] #1542
- [x] #1543
- [ ] #1544
- [x] #1545
- [x] #1546
Hi @CTTY, I'm curious how write node and commit node Interact in this path🤔? Actually I have a draft in previous and it implement integration via DataSink trait. #833
I am interested on working on this
Hi @ZENOTME ,
how write node and commit node Interact in this path
Write node will need to serialize Vec<DataFile> and send it to commit node in a stream, and commit node will deserialize it. My draft here probably will make more sense to me explaining in text:)
I've discussed with @liurenjie1024 offline over the DataSink trait before, and we are not sure about some design details in DataSink:
- The repartitioning/demuxing and the following writing process in DataSinkExec will be done on a single node using multiple threads (link) and demuxing only makes sense when you don't know the parallelism beforehand, which is not the case here because parallelism should be configurable
- DataSinkExec enforces input to be single partitioned (link)
Hi @ZENOTME ,
how write node and commit node Interact in this path
Write node will need to serialize
Vec<DataFile>and send it to commit node in a stream, and commit node will deserialize it. My draft here probably will make more sense to me explaining in text:)I've discussed with @liurenjie1024 offline over the DataSink trait before, and we are not sure about some design details in
DataSink:
- The repartitioning/demuxing and the following writing process in DataSinkExec will be done on a single node using multiple threads (link) and demuxing only makes sense when you don't know the parallelism beforehand, which is not the case here because parallelism should be configurable
- DataSinkExec enforces input to be single partitioned (link)
I see. Thanks your explain @CTTY! This design LGTM. Let's move!
It looks like all of the tasks listed for this issue are complete. Is the topic still open?
I'm asking because I'm looking at testing some features with Iceberg and I'm wondering if there is support for only append operations or if we an also do overwrite / replace. In insert_into it looks like the insert mode is ignored. I know the parquet table provider only has append support, so I'm wondering if that trickles down to iceberg as well.
There are still some items left to fully wrap this up:
- https://github.com/apache/iceberg-rust/pull/1872
- https://github.com/apache/iceberg-rust/pull/1887
- Sort node is never applied
But the insert should work for simple append operation, you can refer to the slt here for example usages: https://github.com/apache/iceberg-rust/pull/1887/files#diff-1d3fc383dbd1622cbdeefd9e0b55e2d319d9a54b841ff5978fc56292f8e696fa
I'm wondering if there is support for only append operations
Yes, currently we only support append operations with DataFusion