icelake icon indicating copy to clipboard operation
icelake copied to clipboard

Implement append only task writer.

Open liurenjie1024 opened this issue 2 years ago • 7 comments

An append-only task writer accepts an optional partitioner, file appender factory as arguments. When it receives records, it dispatches records to different file writer according to the partition key(generated by partitioner), and inserts it. When it finished, it returns generated data file structs.

Notice that this will be the api used directly by compute engines such as risingwave, ballista. We can refer to following implementation as an example.

https://github.com/apache/iceberg/blob/e340ad5be04e902398c576f431810c3dfa4fe717/core/src/main/java/org/apache/iceberg/io/PartitionedFanoutWriter.java#L28

liurenjie1024 avatar Jul 05 '23 10:07 liurenjie1024

How did this api used to by compute engines? Such as:

// Create a writer to write data file
let task_writer = table.data_writer();
task_writer.write()
let data_files = task_writer.close();

// apply these data file to the table
let tx = table.transaction;
tx.apply(data_file);
tx.commit();

ZENOTME avatar Jul 06 '23 01:07 ZENOTME

How did this api used to by compute engines?

Let's prioritize making it functional for now and refine the API later.

Xuanwo avatar Jul 06 '23 02:07 Xuanwo

How did this api used to by compute engines? Such as:

Let's use risingwave's new coordinated sink as an example:

  1. Each each sink will contains a task writer
  2. When it needs to commit, it calls task writer's commit methods to get data files. The data files will be serialized and passed to sink coordinator
  3. Sink coordinator will iceberg table apis to create a new snapshot and do commitment

liurenjie1024 avatar Jul 06 '23 05:07 liurenjie1024

This issue can be close now

ZENOTME avatar Jul 10 '23 02:07 ZENOTME

We need to add support for partition spec. cc @ZENOTME

liurenjie1024 avatar Jul 10 '23 02:07 liurenjie1024

I realize that in future we need to add position delete file writer, and the user can use like following (use different writer seperately):

// Create a writer to write data file
let append_writer = table.append_writer();
append_writer.write();
let delete_writer = table.delete_writer();
delete_writer.write();

let append_data_files = append_writer.close();
let delete_data_files = delete_writer.close();

// apply these data file to the table
let tx = table.transaction;
tx.apply(append_data_files);
tx.commit();

// apply these data file to the table
let tx = table.transaction;
tx.apply(delete_data_files);
tx.commit();

So maybe name the interface be append_writer() will be better?

ZENOTME avatar Jul 13 '23 07:07 ZENOTME

In my original design, the task writer should provides two methods:

insert_record
update_record

The internal implementation of update_record needs to maintain a map of record id to file position, this way we can keep users from using low level api of file writer.

liurenjie1024 avatar Jul 13 '23 08:07 liurenjie1024

Closing for https://github.com/icelake-io/icelake/issues/279

Xuanwo avatar Jul 09 '24 10:07 Xuanwo