iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

[feat] add missing metadata tables

Open kevinjqliu opened this issue 1 year ago • 32 comments

Feature Request / Improvement

Looks like there are a few more metadata tables currently missing in PyIceberg.

Source of truth for metadata tables: https://iceberg.apache.org/javadoc/latest/org/apache/iceberg/MetadataTableType.html

Done: https://py.iceberg.apache.org/api/#inspecting-tables

  • [x] SNAPSHOTS
  • [x] PARTITIONS
  • [x] ENTRIES
  • [x] REFS
  • [x] MANIFESTS
  • [x] METADATA_LOG_ENTRIES
  • [x] HISTORY
  • [x] FILES

Missing:

  • [ ] ALL_DATA_FILES
  • [ ] ALL_DELETE_FILES
  • [ ] ALL_ENTRIES
  • [ ] ALL_FILES
  • [x] ALL_MANIFESTS
  • [x] DATA_FILES
  • [x] DELETE_FILES
  • [ ] POSITION_DELETES

kevinjqliu avatar Aug 13 '24 06:08 kevinjqliu

@kevinjqliu I would like to work on this one.

soumya-ghosh avatar Aug 13 '24 08:08 soumya-ghosh

Hey @soumya-ghosh - if you want to split the workload between us i would love to also give this a try

amitgilad3 avatar Aug 13 '24 08:08 amitgilad3

Sure @amitgilad3, most likely there will be separate PRs for each of above metadata tables. I can work on data_files, all_data_files and all_manifests

soumya-ghosh avatar Aug 13 '24 09:08 soumya-ghosh

Thanks for volunteering to contribute! I was thinking we could do something similar to #511 where each metadata table can be assigned at a time. And feel free to work on another after the first is done!

kevinjqliu avatar Aug 13 '24 11:08 kevinjqliu

@kevinjqliu we can group the tasks in following way:

  • data_files and delete_files - they are subsets of files, just a filter condition on content field, hence can be addressed in same PR
  • all_files, all_data_files and all_delete_files - Once the all_files is implemented, the other tables are again subsets of all_files, can be addressed in single PR
  • all_entries
  • all_manifests
  • position_deletes

What do you think?

soumya-ghosh avatar Aug 13 '24 19:08 soumya-ghosh

That makes sense to me, thanks @soumya-ghosh

kevinjqliu avatar Aug 13 '24 21:08 kevinjqliu

@kevinjqliu added PR #1066 for data_files and delete_files.

soumya-ghosh avatar Aug 15 '24 22:08 soumya-ghosh

Hey @kevinjqliu, any thoughts how to implement all_files table? I initially thought that that all_files is returning files from all snapshots referenced in current table metadata and hence the repetitions in the output. I tested this logic and compared the output against all_files metadata table through Spark. I observed that although there were duplicates for several file_path, number of files returned in Spark is much less than above hypothesis.

soumya-ghosh avatar Sep 12 '24 19:09 soumya-ghosh

What is the difference between your implementation's output vs sparks?

From the spark docs, "To show all files, data files and delete files across all tracked snapshots, query prod.db.table.all_files"

I initially thought that that all_files is returning files from all snapshots referenced in current table metadata and hence the repetitions in the output.

this sounds right to me. maybe spark gets rid of duplicate rows?

kevinjqliu avatar Sep 13 '24 00:09 kevinjqliu

From spark docs,

These tables are unions of the metadata tables specific to the current snapshot, and return metadata across all snapshots. The "all" metadata tables may produce more than one row per data file or manifest file because metadata files may be part of more than one table snapshot.

So, here's my approach (pseudo-code):

metadata = load_table_metadata()
for snapshot in metadata["snapshots"]:
    manifest_list = read manifest list from snapshot
    for manifest_file in manifest_list:
        manifest = read manifest file
        for file in manifest:
            process file (data_file or delete_file)

With this approach the number of files in output is much higher than the corresponding output of all_files table in Spark.

soumya-ghosh avatar Sep 13 '24 20:09 soumya-ghosh

I see. So if I have a new table and append to it 5 times, I expect 5 snapshots and 5 manifest list files. I think each manifest list file will repeatedly refer to the same underlying manifest file, which will be read over and over causing duplicates.

What if you just return all unique (data+delete) files?

kevinjqliu avatar Sep 13 '24 21:09 kevinjqliu

What if you just return all unique (data+delete) files?

In this case, output will not match with Spark. Will that be okay?

Also found this PR from Iceberg,

These tables may contain duplicate rows. Deduplication can't be done through the current scan interface unless all of the work is done during scan planning on a single node. Duplicates are the trade-off for being able to process the metadata in parallel for large tables.

soumya-ghosh avatar Sep 14 '24 10:09 soumya-ghosh

@soumya-ghosh I wonder if that's still the case today, that PR is from 2020. Do you have a WIP PR I can take a look at? We can also bring this to the devlist to double-check the correct behavior

kevinjqliu avatar Oct 03 '24 20:10 kevinjqliu

@kevinjqliu added PR - https://github.com/apache/iceberg-python/pull/1241 for all_manifests.

Will get on with all_files, all_data_files and all_delete_files next.

soumya-ghosh avatar Oct 20 '24 18:10 soumya-ghosh

Thanks for your contribution here @soumya-ghosh. I just merged #1241 for all_manifests. Are you still interested in adding all_files, all_data_files and all_delete_files?

kevinjqliu avatar Jan 11 '25 01:01 kevinjqliu

Yes I will start working on that soon, have been busy last few weeks so couldn't make any progress.

soumya-ghosh avatar Jan 11 '25 05:01 soumya-ghosh

Hey @soumya-ghosh & @kevinjqliu , would love to contribute . i dont want to step on you work so i was wondering what i can take from this list: positional_deletes, all_files, all_data_files and all_delete_files ?

amitgilad3 avatar Jan 11 '25 12:01 amitgilad3

sure @amitgilad3. You can work on positional_deletes and all_entries. all_files, all_data_files and all_delete_files will use the same base implementation and I've an approach in mind so let me give it a shot. If I'm unable to make progress, will let you know.

If you want to work on all_files, I can swap it you.

soumya-ghosh avatar Jan 11 '25 13:01 soumya-ghosh

@soumya-ghosh , Ill will start with positional_deletes and see how fast i can finish it , once im done we can see about the rest

amitgilad3 avatar Jan 12 '25 10:01 amitgilad3

@kevinjqliu added PR - https://github.com/apache/iceberg-python/pull/1626 for all_files, all_data_files and all_delete_files. Have implemented them in single PR as they data and delete files are subsets of all_files.

soumya-ghosh avatar Feb 08 '25 10:02 soumya-ghosh

Awesome work!! @soumya-ghosh - if all goes well next release will have all metadata tables acessable from pyiceberg 🚀

amitgilad3 avatar Feb 08 '25 11:02 amitgilad3

@amitgilad3 Right back at you! I see you've raised PRs for the remaining ones, will take a look.

soumya-ghosh avatar Feb 08 '25 11:02 soumya-ghosh

Thanks for the contribution!! Appreciate it. Before we close out this issue, i want to double check a few things

  1. Documentation, all tables are documented at https://py.iceberg.apache.org/api/#inspecting-tables
  2. Optional time travel, for any none all_* metadata tables, lets expose an optional parameter snapshot_id to provide the ability to time travel. This is already available in some metadata tables. Let's make sure its consistent across all metadata tables
  3. When time traveling, we should take into account the state of the table at the particular snapshot_id. Things like schema and partition evolution. In some places, we just use tbl.metadata.schema() which is the current table schema and might be incorrect when time traveling
  4. Similar to above, for all_* metadata tables, when querying other snapshots, make sure we're using the correct table state
  5. (edit) double check all metadata tables. For example, partitions metadata table does not respect partition evolution currently #1120

Other than those, i think we're good to include this in the next release! 🥳

kevinjqliu avatar Feb 08 '25 18:02 kevinjqliu

For point 1 - will raise a separate PR covering documentation updates for these metadata tables.

For point 2, 3, 4 - Is time travel through snapshot_id or timestamp supported for all_* metadata tables? I tried and got below error in Spark Query - spark.sql(f"SELECT count(1) FROM {identifier}.all_files for version as of {snapshot_id}").show() Error - pyspark.errors.exceptions.captured.UnsupportedOperationException: Cannot select snapshot in table: ALL_FILES

As per current iceberg code, such operations are not supported on all_* metadata table.

soumya-ghosh avatar Feb 08 '25 22:02 soumya-ghosh

Is time travel through snapshot_id or timestamp supported for all_* metadata tables?

what i mean is that for all_* metadata tables, we're essentially doing something like [inspect.files(snapshot.snapshot_id) for snapshot in all_snapshots] and we should make sure that we're not just referring to the current schema, for example.

I guess this can also occur for the rest of the metadata tables too. For example, there's a bug in the partitions metadata table right now for partition evolution #1120

I just want to double check these things before calling this done :)

kevinjqliu avatar Feb 09 '25 01:02 kevinjqliu

I understand that files table by snapshot and all_files (and its derivatives) should respect schema evolution. The keys in column of readable_metrics is derived from schema, thus the source of inconsistency.

I did a test to see the behavior in Spark, observations in gist. It appears that in Spark constructs the readable_metrics column by considering the current schema (which maybe a bug).

Thoughts @kevinjqliu ?

soumya-ghosh avatar Feb 09 '25 22:02 soumya-ghosh

Hey @soumya-ghosh @kevinjqliu - just so i understand since i already implemented support for specific snapshot in all_entries and in position_deletes , do we want to support this or not ?

amitgilad3 avatar Feb 10 '25 14:02 amitgilad3

@amitgilad3 were you able to test all_entries against the spark in the integration tests? As per Iceberg code, it should throw exception if one tries to query all_* for a specific snapshot. Will check all_entries PR.

soumya-ghosh avatar Feb 10 '25 16:02 soumya-ghosh

@soumya-ghosh - when i run

    spark.sql(f"SELECT count(1) FROM {identifier}.all_entries for version as of {snapshot_id}").show()

i get the following error -

pyspark.errors.exceptions.captured.UnsupportedOperationException: Cannot select snapshot in table: ALL_ENTRIES

so i guess we should not support it for all_entries but for position_deletes it works with spark so ill keep it

amitgilad3 avatar Feb 10 '25 17:02 amitgilad3

@kevinjqliu awaiting your thoughts on above 4 comments.

soumya-ghosh avatar Feb 14 '25 22:02 soumya-ghosh