iceberg-rust icon indicating copy to clipboard operation
iceberg-rust copied to clipboard

[EPIC] Support for appending data to iceberg table.

Open liurenjie1024 opened this issue 7 months ago • 16 comments

What's the feature are you trying to implement?

This issue is used to track implementing appending data to iceberg table. In summary, after we complete all subtasks in this issue, we should be able to execution following insert sql statment in datafusion to insert table to iceberg table:

insert into t value (1, 'a');

For more details about insert statement, please refer to datafusion doc.

Commit path

  • [x] #1386
  • [x] #1387
  • [x] #1388
  • [ ] #1389

Writer path

  • [x] #1572
  • [ ] #1573
  • [ ] Implement data writer for non partitioned table.
  • [x] https://github.com/apache/iceberg-rust/issues/1541

Datafusion integration

Willingness to contribute

I would be willing to contribute to this feature with guidance from the Iceberg Rust community

liurenjie1024 avatar May 27 '25 08:05 liurenjie1024

Thanks for creating this @liurenjie1024!

jonathanc-n avatar May 27 '25 08:05 jonathanc-n

Thanks @liurenjie1024 for this! Let's move!

Add a method in TableCommit to apply changes to TableMetadata to create a new TableMetadata

I don't get why we need this. Is this used by the catalog implementation, e.g. SqlCatalog::update_table? Could you elaborate more the commit process and where we need this?

Add retry in Transaction::commit method

Maybe we can track this sub task in #964.

ZENOTME avatar May 27 '25 13:05 ZENOTME

Hi @liurenjie1024 , thanks for having this!

Would it be ok if I take up the commit path work? I can start working on a POC tomorrow

CTTY avatar May 28 '25 06:05 CTTY

I don't get why we need this. Is this used by the catalog implementation, e.g. SqlCatalog::update_table? Could you elaborate more the commit process and where we need this?

Yes. Catalogs uses this new method to generate a new table metadata, and then it points to new table metadata.

#964 duplicates with this issue.

liurenjie1024 avatar May 28 '25 10:05 liurenjie1024

Hi @liurenjie1024 , thanks for having this!

Would it be ok if I take up the commit path work? I can start working on a POC tomorrow

Cool, thanks!

liurenjie1024 avatar May 28 '25 10:05 liurenjie1024

@CTTY @liurenjie1024 I'm wondering about the commit path proposed for the memory catalog in #1381 , which does #1388 and #1389 , but not the other two things. I think the retries aren't needed because the memory catalog uses an explicit lock rather than optimistic concurrency, and it doesn't seem necessary to me to have TableCommit provide a create_new_metadata function rather than applying the updates in the catalog. Thoughts on this?

hsingh574 avatar May 28 '25 17:05 hsingh574

@CTTY @liurenjie1024 I'm wondering about the commit path proposed for the memory catalog in #1381 , which does #1388 and #1389 , but not the other two things. I think the retries aren't needed because the memory catalog uses an explicit lock rather than optimistic concurrency, and it doesn't seem necessary to me to have TableCommit provide a create_new_metadata function rather than applying the updates in the catalog. Thoughts on this?

Hi, @hsingh574 Using a lock doesn't mean it can't do optimistic concurrency control. The retry is used to validate that all table requiremens are met.

liurenjie1024 avatar May 29 '25 09:05 liurenjie1024

Using a lock doesn't mean it can't do optimistic concurrency control

@liurenjie1024 I don't think it makes sense for multiple MemoryCatalogs to be working on the same table. If thats the case, then all updates to a given table should be serialized by the lock in the MemoryCatalog, so there should be no need to retry because the table metadata cannot have changed after the requirements have been checked once under the lock.

hsingh574 avatar May 29 '25 15:05 hsingh574

Using a lock doesn't mean it can't do optimistic concurrency control

@liurenjie1024 I don't think it makes sense for multiple MemoryCatalogs to be working on the same table. If thats the case, then all updates to a given table should be serialized by the lock in the MemoryCatalog, so there should be no need to retry because the table metadata cannot have changed after the requirements have been checked once under the lock.

Hi, @hsingh574 What do you mean by multiple MemoryCatalogs to be working on the same table? Concurrency control is used detect conflicts on same table in same catalog. Here is the case which may happen concurrently:

-- thread 1
let table = catalog.load_table("a.b");

//do updates to table

catalog.update_table();
---thread 2
let table = catalog.load_table("a.b");
// do updates to table

catalog.update_table();

Here if thread 1 and thread 2 load table with same version, the a conflict may happen, and we need conflict detection.

liurenjie1024 avatar Jun 04 '25 10:06 liurenjie1024

Hi @liurenjie1024 , this might be undesirable since its not how stateful catalogs will work, but from how the MemoryCatalog looks today it seems possible to never have a commit conflict, only requirement failures. This is because of the mutex in namespace_state, which we can acquire during update_table to ensure all updates commit serially and never have to conflict.

So the update_table implementation roughly becomes:

  • acquire mutex
  • load table
  • assert requirements
  • apply updates
  • write new file
  • update metadata location
  • release mutex

So in the example above if thread1 and thread2 try to commit concurrently, the loser will wait for the winner to commit, then acquire the lock and reload the table to see the winner's commit, on top of which they can apply their own requirements and updates.

Again, this may not be desirable because this pessimistic concurrency is opposite of Iceberg's normal optimistic concurrency, and isn't how majority of distributed catalogs would work.

Anyways, I'd like to contribute to this functionality if possible. Which piece would you suggest that has least conflict with the other open PRs?

hsingh574 avatar Jun 05 '25 22:06 hsingh574

I don't think this is called pessimistic concurrency. In true pessimistic concurrency, if one table is under modification, it should be locked, and others writers should acquire a write lock before doing any update.

As with improving memory catalog, I currently don't have much bandwidth to think about the design. But the Catalog trait is open, and you are free to develop and maintain your own version.

liurenjie1024 avatar Jun 06 '25 10:06 liurenjie1024

@liurenjie1024 @CTTY

If you haven't started, could I start working on the Implement fanout partitioned data writer as I see rolling file writer is in progress.

For the first step, I am going to introduce two traits one for PartitionFileWriter and PartitionFileWriterBuilder similar to FileWriter and FileWriterBuilder.

Let me know your thoughts, thanks!

yingjianwu98 avatar Jul 29 '25 00:07 yingjianwu98

Hi @stevie9868 ,

Thanks for being willing to work on this! However, I don't think we need a new trait like PartitionFIleWriter to achieve partition-aware writing. Some of my thoughts:

  • For the datafusion integration, we should have a repartition node (#1543) to split the incoming data by their Iceberg partition values, so each datafusion partition will only need to handle data from a certain iceberg partition and partition-aware writer won't be necessary in this case
  • For the more general case, my rough idea now is to borrow/reimplement some logic from the repartition node --- to split the incoming data by their iceberg partitions. And then you can use the inner writers to handle each partition's data individually. The inner writer here also won't need to be partition-aware. Basically, PartitionFileWriter can just be a decorator just like RollingFileWriter, and don't need to be a trait. The real problem would be #1543 or the data partitioning abstraction

I'll let @liurenjie1024 to comment his thoughts as well

CTTY avatar Jul 29 '25 02:07 CTTY

If you haven't started, could I start working on the Implement fanout partitioned data writer as I see rolling file writer is in progress.

Hi @stevie9868, actually there is work for partition writer here: https://github.com/apache/iceberg-rust/issues/342. And I have a PR for first part: https://github.com/apache/iceberg-rust/pull/1040. Feel free to complete it next when the first PR have been merged.

For the first step, I am going to introduce two traits one for PartitionFileWriter and PartitionFileWriterBuilder similar to FileWriter and FileWriterBuilder.

And actually we don't need the new trait. We can just let partition writer implement the iceberg writer trait and other writer can built on it like following in

let data_file_builder = ...
let partition_writer_builder = PartitionWriterBuilder::new(data_file_builder) 

I think this idea is what @CTTY mentioned that "PartitionFileWriter can just be a decorator just like RollingFileWriter, and don't need to be a trait. "

ZENOTME avatar Jul 29 '25 03:07 ZENOTME

Make sense, thanks for all the detailed explanations!

yingjianwu98 avatar Jul 29 '25 03:07 yingjianwu98

Hi, @CTTY

so each datafusion partition will only need to handle data from a certain iceberg partition and partition-aware writer won't be necessary in this case

This is incorrect. Each datafusion partition may still need to handle data of different iceberg partitions even if we have reparittion node.

liurenjie1024 avatar Jul 29 '25 09:07 liurenjie1024