[feat] add missing metadata tables
Feature Request / Improvement
Looks like there are a few more metadata tables currently missing in PyIceberg.
Source of truth for metadata tables: https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/MetadataTableType.html
Done: https://py.iceberg.apache.org/api/#inspecting-tables
- [x] SNAPSHOTS
- [x] PARTITIONS
- [x] ENTRIES
- [x] REFS
- [x] MANIFESTS
- [x] METADATA_LOG_ENTRIES
- [x] HISTORY
- [x] FILES
Missing:
- [ ] ALL_DATA_FILES
- [ ] ALL_DELETE_FILES
- [ ] ALL_ENTRIES
- [ ] ALL_FILES
- [x] ALL_MANIFESTS
- [x] DATA_FILES
- [x] DELETE_FILES
- [ ] POSITION_DELETES
@kevinjqliu I would like to work on this one.
Hey @soumya-ghosh - if you want to split the workload between us i would love to also give this a try
Sure @amitgilad3, most likely there will be separate PRs for each of above metadata tables. I can work on data_files, all_data_files and all_manifests
Thanks for volunteering to contribute! I was thinking we could do something similar to #511 where each metadata table can be assigned at a time. And feel free to work on another after the first is done!
@kevinjqliu we can group the tasks in following way:
data_filesanddelete_files- they are subsets offiles, just a filter condition on content field, hence can be addressed in same PRall_files,all_data_filesandall_delete_files- Once theall_filesis implemented, the other tables are again subsets ofall_files, can be addressed in single PRall_entriesall_manifestsposition_deletes
What do you think?
That makes sense to me, thanks @soumya-ghosh
@kevinjqliu added PR #1066 for data_files and delete_files.
Hey @kevinjqliu, any thoughts how to implement all_files table?
I initially thought that that all_files is returning files from all snapshots referenced in current table metadata and hence the repetitions in the output.
I tested this logic and compared the output against all_files metadata table through Spark.
I observed that although there were duplicates for several file_path, number of files returned in Spark is much less than above hypothesis.
What is the difference between your implementation's output vs sparks?
From the spark docs, "To show all files, data files and delete files across all tracked snapshots, query prod.db.table.all_files"
I initially thought that that all_files is returning files from all snapshots referenced in current table metadata and hence the repetitions in the output.
this sounds right to me. maybe spark gets rid of duplicate rows?
From spark docs,
These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots. The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.
So, here's my approach (pseudo-code):
metadata = load_table_metadata()
for snapshot in metadata["snapshots"]:
manifest_list = read manifest list from snapshot
for manifest_file in manifest_list:
manifest = read manifest file
for file in manifest:
process file (data_file or delete_file)
With this approach the number of files in output is much higher than the corresponding output of all_files table in Spark.
I see. So if I have a new table and append to it 5 times, I expect 5 snapshots and 5 manifest list files. I think each manifest list file will repeatedly refer to the same underlying manifest file, which will be read over and over causing duplicates.
What if you just return all unique (data+delete) files?
What if you just return all unique (data+delete) files?
In this case, output will not match with Spark. Will that be okay?
Also found this PR from Iceberg,
These tables may contain duplicate rows. Deduplication can't be done through the current scan interface unless all of the work is done during scan planning on a single node. Duplicates are the trade-off for being able to process the metadata in parallel for large tables.
@soumya-ghosh I wonder if that's still the case today, that PR is from 2020. Do you have a WIP PR I can take a look at? We can also bring this to the devlist to double-check the correct behavior
@kevinjqliu added PR - https://github.com/apache/iceberg-python/pull/1241 for all_manifests.
Will get on with all_files, all_data_files and all_delete_files next.
Thanks for your contribution here @soumya-ghosh. I just merged #1241 for all_manifests. Are you still interested in adding all_files, all_data_files and all_delete_files?
Yes I will start working on that soon, have been busy last few weeks so couldn't make any progress.
Hey @soumya-ghosh & @kevinjqliu , would love to contribute . i dont want to step on you work so i was wondering what i can take from this list: positional_deletes, all_files, all_data_files and all_delete_files ?
sure @amitgilad3. You can work on positional_deletes and all_entries.
all_files, all_data_files and all_delete_files will use the same base implementation and I've an approach in mind so let me give it a shot. If I'm unable to make progress, will let you know.
If you want to work on all_files, I can swap it you.
@soumya-ghosh , Ill will start with positional_deletes and see how fast i can finish it , once im done we can see about the rest
@kevinjqliu added PR - https://github.com/apache/iceberg-python/pull/1626 for all_files, all_data_files and all_delete_files.
Have implemented them in single PR as they data and delete files are subsets of all_files.
Awesome work!! @soumya-ghosh - if all goes well next release will have all metadata tables acessable from pyiceberg 🚀
@amitgilad3 Right back at you! I see you've raised PRs for the remaining ones, will take a look.
Thanks for the contribution!! Appreciate it. Before we close out this issue, i want to double check a few things
- Documentation, all tables are documented at https://py.iceberg.apache.org/api/#inspecting-tables
- Optional time travel, for any none
all_*metadata tables, lets expose an optional parametersnapshot_idto provide the ability to time travel. This is already available in some metadata tables. Let's make sure its consistent across all metadata tables - When time traveling, we should take into account the state of the table at the particular snapshot_id. Things like schema and partition evolution. In some places, we just use
tbl.metadata.schema()which is the current table schema and might be incorrect when time traveling - Similar to above, for
all_*metadata tables, when querying other snapshots, make sure we're using the correct table state - (edit) double check all metadata tables. For example, partitions metadata table does not respect partition evolution currently #1120
Other than those, i think we're good to include this in the next release! 🥳
For point 1 - will raise a separate PR covering documentation updates for these metadata tables.
For point 2, 3, 4 -
Is time travel through snapshot_id or timestamp supported for all_* metadata tables?
I tried and got below error in Spark
Query - spark.sql(f"SELECT count(1) FROM {identifier}.all_files for version as of {snapshot_id}").show()
Error - pyspark.errors.exceptions.captured.UnsupportedOperationException: Cannot select snapshot in table: ALL_FILES
As per current iceberg code, such operations are not supported on all_* metadata table.
Is time travel through snapshot_id or timestamp supported for all_* metadata tables?
what i mean is that for all_* metadata tables, we're essentially doing something like [inspect.files(snapshot.snapshot_id) for snapshot in all_snapshots] and we should make sure that we're not just referring to the current schema, for example.
I guess this can also occur for the rest of the metadata tables too. For example, there's a bug in the partitions metadata table right now for partition evolution #1120
I just want to double check these things before calling this done :)
I understand that files table by snapshot and all_files (and its derivatives) should respect schema evolution.
The keys in column of readable_metrics is derived from schema, thus the source of inconsistency.
I did a test to see the behavior in Spark, observations in gist. It appears that in Spark constructs the readable_metrics column by considering the current schema (which maybe a bug).
Thoughts @kevinjqliu ?
Hey @soumya-ghosh @kevinjqliu - just so i understand since i already implemented support for specific snapshot in all_entries and in position_deletes , do we want to support this or not ?
@amitgilad3 were you able to test all_entries against the spark in the integration tests?
As per Iceberg code, it should throw exception if one tries to query all_* for a specific snapshot.
Will check all_entries PR.
@soumya-ghosh - when i run
spark.sql(f"SELECT count(1) FROM {identifier}.all_entries for version as of {snapshot_id}").show()
i get the following error -
pyspark.errors.exceptions.captured.UnsupportedOperationException: Cannot select snapshot in table: ALL_ENTRIES
so i guess we should not support it for all_entries but for position_deletes it works with spark so ill keep it
@kevinjqliu awaiting your thoughts on above 4 comments.