iceberg-rust
iceberg-rust copied to clipboard
Delete Files in Table Scans
I'm looking to start work on proper handling of delete files in table scans and so I'd like to open an issue to discuss some of the design decisions.
A core tenet of our approach so far has been to ensure that the tasks produced by the file plan are small, independent and self-contained, so that they can be easily distributed in architectures where the service that generates the file plan could be on a different machine to the service(s) that perform the file reads.
TheFileScanTask struct represents these individual units of work at present. Currently though, it's shape is focussed on Data files and it does not cater for including information on Delete files that are produced by the scan. Here's how it looks now, for reference:
https://github.com/apache/iceberg-rust/blob/cde35ab0eefffae88c521d4e897ba86ee754861c/crates/iceberg/src/scan.rs#L859-L886
In order to properly process delete files as part of executing a scan task, executors will now need to load in any applicable delete files along with the data file that they are processing. I'll outline what happens now, and follow that by my proposed approach.
Current TableScan Synopsis
The current structure pushes all manifest file entries from the manifest list into a stream which we then process concurrently in order to retrieve their associated manifests. Once retrieved, each manifest then has each of it's manifest entries extracted and pushed onto a channel so that they can be processed in parallel. Each is embedded inside a context object that contains the relevant information that is needed for processing of the manifest entry. Tokio tasks listening to the channel then execute TableScan::process_manifest_entry on these objects, where we filter out any entries that do not match the scan filter predicate.
At this point, a FileScanTask is created for each of those entries that match the scan predicate. The FileScanTasks are then pushed into a channel that produces the stream of FileScanTasks that is returned to the original caller of plan_files.
Changes to TableScan
FileScanTask
Each FileScanTask represents a scan to be performed on a single data file. However, multiple delete files may need to be applied to any one data file. Additionally, the scope of applicability of delete files is any data file within the same partition of the delete file - i.e. the same delete file can need to be applied to multiple data files. Thus an executor needs to know not just the data file that it is processing, but all of the delete files that are applicable to that data file.
The first part of the set of changes that I'm proposing is refactor FileScanTask so that it represents a single data file and zero or more delete files.
- The
data_file_contentproperty would be removed - each task is implicitly about a file of typeData. - A new struct,
DeleteFileEntry, would be added. It would look something like this:struct DeleteFileEntry { path: String, format: DataFileFormat } - A
delete_filesproperty of typVec<DeleteFileEntry>would be added toFileScanTaskto represent the delete files that are applicable to it's data file.
TableScan::plan_files and associated methods
We need to update this logic in order to ensure that we can properly populate this new delete_files property. Each ManifestEntryContext will need the list delete files so that if the manifest entry that it encapsulates passes the filtering steps, it can populate the new delete_files property when it constructs FileScanTask.
A naive approach may be to simply build a list of all of the delete files referred to by the top-level manifest list and give references to this list to all ManifestEntryContexts so that, if any delete files are present then all of them are included in every FileScanTask. This would be a good first step - code that works inefficiently is better than code that does not work at all! It would also permit work to proceed on the execution side.
Improvements could then be made to refine this approach to filter out inapplicable delete files that goes into each FileScanTask's delete_files property.
How does this sound so far, @liurenjie1024, @Xuanwo, @ZENOTME, @Fokko?
Hi, I've recently implemented merge on read in my library using iceberg rust and submitted a working simplified version of the code, which looks somewhat similar to the A naive approach version you proposed! (I have to read the same delete file on my different nodes)
This pr is https://github.com/apache/iceberg-rust/pull/625
About this issue. I have some doubts. About FileScanTask {DeleteFileEntry}. as you said, the delete file and data file are many-to-many, so even if list delete file is saved in the file task, in the optimal case, the call still needs some special operations to make sure that all data file and delete file are dispatched to the same node, and that the delete file file is not read repeatedly. And most likely, this scheduling result is consistent with the partitioning result. In this case, I prefer to expose the partitioning result directly in the file task. Please correct me if there is any misunderstanding
I'm happy to add the partitioning result to the task. This is useful to the executor node when deciding how to distribute tasks, as it enables the use of a few different strategies, the choice of which can be left to the implementer.
It is not necessarily the case that the delete file is read repeatedly if the delete file list is added to the file scan task, since we can store the parsed delete files inside the object cache, preventing them from being read repeatedly on the same node as they'd already be in memory. If the executor ensures that all tasks with the same partition get sent to the same executor, then the files would only be read once.
Thanks @sdd for raising this. The general approach looks good to me. Challenging part of deletion file processing is to filter unnecessary deletion files in each task, which we can introduce as optimization later.
Thanks - I have some skeleton code for the required changes to reader.rs that I'm going to share over the next few days as well.
Thanks for taking a look at the above, @liurenjie1024. I've just submitted a draft PR which outlines the second part of the approach - how we extend the filtering in the arrow reader to handle delete files. https://github.com/apache/iceberg-rust/pull/652
@liurenjie1024, @Xuanwo, @ZENOTME, @xxhZs: if you could take a look at that PR also when you get chance and let me know if you think that the approach seems sensible, that would be great!
Hi all. I'm resurrecting this issue now that @Fokko has kindly helped get the first part of this over the line by reviewing and merging https://github.com/apache/iceberg-rust/pull/652.
I have a branch with an earlier iteration of delete file read support that I'm intending to break up into pieces and submit as separate PRs. There are parts of it that I'm happy with and other parts that I'm less happy with; plus now would be a good opportunity to discuss the higher-level structure of the approach for this again now that I've got a better idea of the different parts of work that are involved.
Outline
TableScan
TableScan::plan_filescreates aDeleteFileIndexwhich accumulates an index of all of the equality delete files and positional delete files that could apply to data files within the scan. * Each of theFileScanTasks in the stream returned byplan_filesis augmented withdelete_files, aVec<FileScanTaskDeleteFile>which contains details of the delete files that could apply to the data file in theFileScanTask.
ArrowReader
Right now, ArrowReader::read takes a stream of FileScanTasks, and for each task, it reads record batches from that task's data file - applies filtering, projection, batching and transformations such as schema migration / column reordering - returning a stream of the RecordBatches resulting from applying these operations to all file scan tasks.
In order to support delete files the filtering logic needs to be extended so that:
- Equality delete files that are referenced in each task's
delete_fileslist are used to filter out rows that match filter predicates in any applicable delete files; - Positional delete files from
delete_filesare used to filter out rows whose index is specified as deleted in applicable delete files.
Filtering Approach
- Predicate based filtering is already being performed in the reader, by transforming the filter predicate present in the
FileScanTaskinto an arrowRowFilterthat gets passed to theParquetRecordBatchStreamBuilder::with_row_filter. - Equality deletes can be handled by merging the file scan task's filter predicate with predicates built from applicable rows present in any equality delete files to form a single larger predicate that gets passed to
ParquetRecordBatchStreamBuilderas before. - Any positional deletes can be handled by using the
RowSelectionlogic - creating aRowSelectionspecific to the applicable positional deletes, and merging this with anyRowSelectioncreated as part of the row selection filter generated from the filter predicate, if present, and passed toParquetRecordBatchStreamBuilder::with_row_selection.
Loading and Processing Delete Files
Whilst I was fairly happy with the approach taken to filtering in my first draft, the approach taken to loading and processing delete files felt like it could be improved.
The first draft took this approach:
// delete_file.rs
// Represents a parsed Delete file that can be safely stored
// in the Object Cache.
// Storing this in the object cache saves us from parsing
// the same file multiple times in the case of delete files that could apply to
// multiple files within the stream (It would be better if we also
// have a way of preventing more than one task from starting to parse the same
// delete file in parallel)
pub(crate) enum Deletes {
// Positional delete files are parsed into a map of
// filename to a sorted list of row indices.
// ( I'm ignoring the stored rows that are present in
// positional deletes for now. I think they only used for statistics?)
Positional(HashMap<String, Vec<u64>>),
// Equality delete files are initially parsed solely as an
// unprocessed list of `RecordBatch`es from the equality
// delete files.
// Can we do better than this by storing a `Predicate`?
// The equality deletes refer to fields by name rather than field_id,
// so if we cache a Predicate rather than just a raw `RecordBatch`
// then a field name change would invalidate the cached Predicate.
// Similarly, I don't think we can cache these as `BoundPredicate`s
// as the column order could be different across different data
// files and so the accessor in the bound predicate could be invalid)?
Equality(Vec<RecordBatch>),
}
// Processes the RecordBatches of a pos del file into a `HashMap` that
// maps filenames to lists of row indices that are marked as
// deleted for that file
pub(crate) async fn parse_positional_delete_file(
mut record_batch_stream: ArrowRecordBatchStream,
) -> Result<Deletes> {
// ...
}
// Simpler - collects the RecordBatches of an eq del file
// into a `Vec<RecordBatch>`
// and returns them in a `Deletes::Equality`
pub(crate) async fn parse_equality_delete_file(
mut record_batch_stream: ArrowRecordBatchStream,
) -> Result<Deletes> {
// ...
}
// arrow/reader.rs
impl ArrowReader {
// ... rest of ArrowReader impl
// Spawned at the start of `process_file_scan_task` and then awaited on
// after the record_batch_stream_builder has been created and the page
// index loaded.
// Responsible for loading the delete files through FileIO and parsing
// them into a list of `Deletes` objects
async fn get_deletes(
delete_file_entries: Vec<FileScanTaskDeleteFile>,
file_io: FileIO,
concurrency_limit_data_files: usize,
) -> Result<Vec<Deletes>> {
// concurrently physically loads delete files for each
// `FileScanTaskDeleteFile` via `FileIO` into a
// `RecordBatchStream`, which gets mapped through
// `parse_positional_delete_file` or `parse_equality_delete_file`
// into a combined vec of `Deletes`
}
// Maps `Deletes::Positional` objects into a list
// of indices of rows that need deleting
// from the data file with the specified path
fn get_positional_delete_indexes(
data_file_path: &str,
deletes: &[Deletes],
) -> Option<Vec<usize>> {
// trivial filter / map
}
// Maps applicable `Deletes::Equality` objects into
// an Iceberg predicate that is bound to the provided snapshot
fn get_equality_deletes(
delete_files: &[Deletes],
snapshot_schema: SchemaRef,
) -> Result<Option<BoundPredicate>> {
// starts with `AlwaysFalse` and builds up a predicate
// by `AND`ing together predicates derived from applicable
// rows in the RecordBatches
}
}
@liurenjie1024 commented that this approach could be improved by reusing the logic present in the data file reader rather than reimplementing similar logic. He also mentioned that inspiration could be taken from the Java implementation's GenericReader.
Suggestions that improve upon the approach to loading and caching the delete files above are welcome. I'll refrain from submitting any changes related to this until others have had chance to comment. I'll proceed with the more well-defined aspects relating to orchestrating the filtering logic itself.
OK, I have an improved design for loading of delete files in the read pgase that I'll share shortly.
We introduce a DeleteFileManager, constructed when ArrowReader gets built and provided with a FileIO. Reader keeps an Arc of this that it clones and passes to process_file_scan_task. process_file_scan_task calls an async method of DeleteFileManager, passing in the delete file list for its file scan task.
DeleteFileManager loads and processes the delete files, deduplicating between multiple file scan tasks that reference the same delete files.
DeleteFileManager exposes two methods that process_file_scan_task calls later on - one to retrieve the list of positional delete row indices that apply to a specified data file, and another to get a filter predicate derived from the applicable delete files.
Thanks for this great job! @sdd Should we also consider the case that enum Deletes occupy too much space so we need to support spilling it into disk?
@ZENOTME I think that we'll want to do that at some point but it feels more of a day 2 task. We're not touching the disk anywhere in the library so far, as far as I know, and so it would need some careful consideration.
I've worked on an improved design for loading and parsing of delete files by the DeleteFileManager. The code for this can be seen in https://github.com/apache/iceberg-rust/pull/982.
- Create a single stream of all delete file tasks irrespective of type, so that we can respect the combined concurrency limit
- We then process each in two phases: load and parse.
- for positional deletes, the load phase instantiates an
ArrowRecordBatchStreamto stream the file contents out - for eq deletes, we first check if the EQ delete is already loaded or being loaded by another concurrently processing data file scan task. If it is, we return a future from this phase. If not, we create such a future the same equality delete file, and return an
ArrowRecordBatchStreamfrom the load phase as per the positional deletes - only this time it is accompanied by a one-shot channel sender that we will eventually use to resolve the shared future that we stored in the state. - In a future update to the DeleteFileManager that adds support for delete vectors, the load phase will return a
PuffinReaderfor them. - The parse phase parses each record batch stream according to its associated data type. The result of this is a map of data file paths to delete vectors for the positional delete tasks (and in future for the delete vector tasks). For equality delete file tasks, this results in an unbound
Predicate. - The unbound
Predicates resulting from equality deletes are sent to their associated one-shot channel to store them in the right place in the delete file manager's state. - The results of all of these futures are awaited on in parallel with the specified level of concurrency and collected into a
Vec. We then combine all of the delete vector maps that resulted from any positional delete or delete vector files into a single map and persist it in the state. Conceptually, the data flow is like this:
FileScanTaskDeleteFile
|
Already-loading EQ Delete | Everything Else
+---------------------------------------------------+
| |
[get existing future] [load recordbatch stream / puffin]
DeleteFileContext::InProgEqDel DeleteFileContext
| |
| |
| +-----------------------------+--------------------------+
| Pos Del Del Vec (Not yet Implemented) EQ Del
| | | |
| [parse pos del stream] [parse del vec puffin] [parse eq del]
| HashMap<String, RoaringTreeMap> HashMap<String, RoaringTreeMap> (Predicate, Sender)
| | | |
| | | [persist to state]
| | | ()
| | | |
| +-----------------------------+--------------------------+
| |
| [buffer unordered]
| |
| [combine del vectors]
| HashMap<String, RoaringTreeMap>
| |
| [persist del vectors to state]
| ()
| |
+-------------------------+-------------------------+
|
[join!]
Update: The implementation is ready for review, split across a series of PRs:
- https://github.com/apache/iceberg-rust/pull/652
- https://github.com/apache/iceberg-rust/pull/950
- https://github.com/apache/iceberg-rust/pull/951
- https://github.com/apache/iceberg-rust/pull/982
- https://github.com/apache/iceberg-rust/pull/1011
- https://github.com/apache/iceberg-rust/pull/1017
This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.
I think we could close this now? cc @sdd
Yes! We can close now