iceberg-python icon indicating copy to clipboard operation
iceberg-python copied to clipboard

Support setting a snapshot property in same commit as spark.sql

Open brianfromoregon opened this issue 1 year ago • 17 comments

Feature Request / Improvement

This cookbook has a java snippet to update a snapshot property atomically with a sql MERGE INTO.

// Update the target table and set the watermark in the same commit
CommitMetadata.withCommitProperties(
    Map.of("watermark:raw.log_source", endSnapshotId),
    () -> {
        spark.sql("MERGE INTO ...");
        return 0;
     }, RuntimeException.class);

It would be great if pyiceberg allowed me to similarly wrap my spark.sql call and add snapshot properties.

brianfromoregon avatar Feb 05 '24 13:02 brianfromoregon

Thanks for raising this @brianfromoregon!

I think it would be a great addition. We need to extend the .append and .overwrite API and allow passing in a map. And then it needs to be passed in when constructing the Summary:

https://github.com/apache/iceberg-python/blob/622adb799e1c5acb48be95ba14a670bafef54a61/pyiceberg/table/init.py#L2427

Are you interested in contributing this? :)

Fokko avatar Feb 05 '24 20:02 Fokko

Beyond writing snapshot summary fields, this issue is also requesting ability to write those fields in same snapshot as one created by spark.sql. That would take changes beyond what you describe right @Fokko . Ideally id have a single transaction to (1) read a summary field (2) run spark.sql (3) write a summary field

brianfromoregon avatar Feb 06 '24 18:02 brianfromoregon

@brianfromoregon That's not possible. Spark will create the snapshot, and those are immutable. So you cannot update those afterward in PyIceberg.

Fokko avatar Feb 06 '24 18:02 Fokko

@Fokko Interesting, that makes sense, so what does the linked cookbook code mean when it says "in the same commit"?

brianfromoregon avatar Feb 06 '24 18:02 brianfromoregon

@brianfromoregon In the cookbook example it will be in the same commit, which will result into a single snapshot. I was under the impression that you also want to replicate this on the Python side :)

Fokko avatar Feb 06 '24 18:02 Fokko

@Fokko Yes I am using python. So this is possible from java but impossible from python, interesting I wonder why.

brianfromoregon avatar Feb 06 '24 18:02 brianfromoregon

Because it is not part of the API, so we need to extend it :) In Python, you would append an Arrow table to the Iceberg table and set the properties in the same commit (snapshot).

Fokko avatar Feb 06 '24 18:02 Fokko

Ok agreed. So my intention was to have this issue represent extending the API to allow same commit semantics like the java cookbook, and then issue #367 represent the (simpler) change to allow setting snapshot properties in general.

brianfromoregon avatar Feb 06 '24 18:02 brianfromoregon

As @brianfromoregon has mentioned I also understood the issue raised to " represent extending the API to allow same commit semantics like the java"

ajosephides avatar Feb 08 '24 09:02 ajosephides

I would love that, and this is what I suggested in https://github.com/apache/iceberg-python/issues/368#issuecomment-1928020308

Fokko avatar Feb 08 '24 11:02 Fokko

@brianfromoregon @Fokko can I take a stab at this?

Gowthami03B avatar Feb 13 '24 02:02 Gowthami03B

https://github.com/apache/iceberg-python/pull/419

Gowthami03B avatar Feb 14 '24 17:02 Gowthami03B

I think there's still some confusion here, since there are two possible interpretations of "represent extending the API to allow same commit semantics like the java":

  • Interpretation 1: allow pyiceberg to both write snapshot properties and table updates (append/overwrite) in the same transaction (pyiceberg is doing both updates); this is the interpretation implemented in #419.
  • Interpretation 2: Allow pyiceberg and pyspark to participate in the same transaction, with pyiceberg writing snapshot properties and pyspark executing SQL to update the table. This is what the java cookbook example is showing, using java iceberg API with java spark API. The differences for interpretation 2 over interpretation 1 would be that you could do everything supported in PySpark (MERGE INTO, distributed processing of table updates, etc) that isn't possible (yet) in pyiceberg.

I think interpretation 2 is what @brianfromoregon was getting at, and I don't know how feasible it is... but I think both capabilities are nice so it's great to have #419, and if interpretation 2 is possible, that would also be really useful. Alternative to interpretation 2 would be some other way to set snapshot properties in PySpark without using pyiceberg, and I don't think that exists either.

corleyma avatar Feb 29 '24 19:02 corleyma

Hi @corleyma, my thinking was that Issue 367 is meant to represent "Interpretation 1" and this issue 368 is meant to represent "Interpretation 2". Fully agreed that both features are useful!

brianfromoregon avatar Mar 11 '24 16:03 brianfromoregon

Hi @brianfromoregon and @corleyma , from my understanding of PyIceberg and PySpark Iceberg, I'm not sure if allowing the two separate clients to participate in the same transaction will be possible any time soon. Currently, Transactions are designed as classes, and they are available only to the specific client that's building it.

This feature request implies that the transaction should be shared between the two separate clients which would need either:

  1. the Transaction class to be exchanged in a way that can be understood by both Spark and Python within the same machine (presumably the Spark driver)
  2. or have Transaction that is sent to an intelligent Catalog backend, that doesn't commit it immediately, but stages the transaction - so that the transaction can be looked up with a unique identifier and built upon by separate clients, until it is committed.

Is there a specific use case you are thinking of that requires both PySpark-Iceberg and PyIceberg? We know PyIceberg is still evolving, but it is growing fast and we will reach somewhat feature parity in the near future. After that, the choice of the client we use would really depend on the use case - would it require the built in distributed capabilities of spark? or do we want to perform simpler transactions through PyIceberg?

@Fokko - do you have any thoughts on this topic?

sungwy avatar Mar 26 '24 15:03 sungwy

Hi @syun64, thanks for chiming in!

My batch app store historical data, there is always a date column. It runs for each date and will insert data for that date. Sometimes there is legitimately no data available for a particular date, no matter how many times it runs there will never be data. Other times the app has an error or fails to run and needs to be re-run for a date. I'm trying to allow my app to differentiate between missing dates and present-but-empty dates so it does not constantly try re-running for dates that will never produce data. When I was using raw parquet files I would simply write an empty file for a date to represent present-but-empty. Asking in Slack I learned that Iceberg does not support this concept (for example no empty partitions allowed) so instead I am aiming to use metadata (snapshot properties) to store the date ranges that are reflected in the stored data.

In order to implement this with snapshot properties I want my writer to do the following transactionally:

  1. Fetch the current snapshot's dateranges property.
  2. Modify that dateranges value to include the dates which are about to be written.
  3. Merge the new data and update the dateranges snapshot property, in the same new snapshot.

If another concurrent writer were to write its own new snapshot between step 1 and 3, I would want my writer to throw an exception and then I'll try again at step 1 starting from the latest snapshot.

Today I use PySpark Iceberg for writing because PyIceberg does not yet support partitioned writes. PyIceberg is getting partitioned writes soon, I am excited to try it! But until then I'm using PySpark for writing and want some way to accomplish steps 1-3 from a python client. I hope this explains my goal and motivation.

Another approach I had in mind was to be able to read and write snapshot properties from PySpark SQL query. That is appealing because it would be a single-client solution which would also allow my non-python clients to perform writes that honor this dateranges property.

brianfromoregon avatar Mar 26 '24 16:03 brianfromoregon

In order to implement this with snapshot properties I want my writer to do the following transactionally:

Fetch the current snapshot's dateranges property. Modify that dateranges value to include the dates which are about to be written. Merge the new data and update the dateranges snapshot property, in the same new snapshot. If another concurrent writer were to write its own new snapshot between step 1 and 3, I would want my writer to throw an exception and then I'll try again at step 1 starting from the latest snapshot.

Another approach I had in mind was to be able to read and write snapshot properties from PySpark SQL query. That is appealing because it would be a single-client solution which would also allow my non-python clients to perform writes that honor this dateranges property.

I think you should be able to do this today by keeping track of the Iceberg table snapshot you are looking at to do task (1), and then writing with snapshot property and then using an isolation property based on the snapshot commit you've started your sequence of operations from, so that your commit fails if there has been a concurrent commit that was made since then.

https://iceberg.apache.org/docs/1.5.0/spark-configuration/#write-options

"isolation-level", "validate-from-snapshot-id" and "snapshot-property" are probably the write options you want to use to achieve your goal in PySpark. Let me know if that works for you!

sungwy avatar Mar 26 '24 21:03 sungwy

This issue has been automatically marked as stale because it has been open for 180 days with no activity. It will be closed in next 14 days if no further activity occurs. To permanently prevent this issue from being considered stale, add the label 'not-stale', but commenting on the issue is preferred when possible.

github-actions[bot] avatar Sep 23 '24 00:09 github-actions[bot]

Snapshot property can now be specified in PyIceberg Table APIs

https://github.com/apache/iceberg-python/pull/419

sungwy avatar Sep 23 '24 12:09 sungwy