ibis icon indicating copy to clipboard operation
ibis copied to clipboard

feat(api): Support time travel query

Open mfatihaktas opened this issue 1 year ago • 5 comments

Is your feature request related to a problem?

Flink supports specifying a point in time and querying the corresponding data in a given table, aka time travel. As a use case, time travel would be useful when the results are desired/required to be reproducible. To support such use cases in Ibis/Flink, we would need to add time travel support for the Flink backend.

Describe the solution you'd like

The API for time travel would look like the following

table = con.create_table(
    ...
    watermark=ibis.watermark(...),
)

expr = table.at_time(<Timestamp expression>).select(...)

Backends

Supporting time travel

  • DeltaLake
  • Spark SQL
    • https://github.com/delta-io/delta/issues/128
    • https://github.com/delta-io/delta/pull/1288
    • https://issues.apache.org/jira/browse/SPARK-37219
    • https://iceberg.apache.org/docs/latest/spark-queries/#time-travel
  • BigQuery
  • Flink
  • MsSQL
  • Trino

Notes

  • Time travel in Flink is somewhat similar to asof in pandas.
    • Flink time travel allows for accessing the snapshot of a table that corresponds to a given time expression.
    • Pandas asof allows for retrieving the last non-nan row within a dataframe that has a smaller value for a given <column, value> (e.g., the column might contain datetime strings).
  • Time travel support in Flink is tied to the catalog. The catalog that the Flink table belongs to has to implement getTable(ObjectPath tablePath, long timestamp) . More details are given here.

Refs

What version of ibis are you running?

7.2.0

What backend(s) are you using, if any?

Flink

Code of Conduct

  • [X] I agree to follow this project's Code of Conduct

mfatihaktas avatar Feb 02 '24 20:02 mfatihaktas

Supported in

  • Snowflake
  • Deltalake read_delta
  • ~Iceberg~ No ibis API for this (yet)
  • Flink (Perhaps only when using Paimon?)
  • BigQuery

Not supported in

  • Postgres

We might want this as a method on special leaf table expression type, like

class TimeTravelTable:
    def at_time(self, **kwargs):
        ...

cpcloud avatar Feb 05 '24 18:02 cpcloud

@cpcloud Thanks for the note. Would the "sqlglot refactoring" be a prerequisite or impact the work for this in any way? Asking per your review for feat(flink): implement support for temporal join. As far as I can tell, sqlglot work should not be a prerequisite to move forward with this issue.

mfatihaktas avatar Feb 05 '24 19:02 mfatihaktas

I think it will, but only because the sqlglot refactor involves a lot of changes to the backend, and it will make development more difficult to have two different branches going for a very long time.

Since 8.0.0 was just released, I need to wrap up the flink port and then we can merge the-epic-split to main.

cpcloud avatar Feb 05 '24 22:02 cpcloud

  • Flink (Perhaps only when using Paimon?)

There are only two requirements explicitly mentioned in the Flink doc:

  1. On the catalog:

Currently, time travel requires the corresponding catalog that the table belongs to implementing the getTable(ObjectPath tablePath, long timestamp) method. See more details in Catalog.

  1. On the table:

FOR SYSTEM_TIME AS OF timestamp_expression:Used to query data at a specific point in time, the timestamp_expression represents the historical time point you want to query. The timestamp_expression can be a specific timestamp or a time-related expression that can be reduced to a constant, and this expression can only apply to physical tables and not to views or sub-queries.

My understanding is that tables need to be versioned to be able to support time travel.

Versioned tables are defined implicitly for any tables whose underlying sources or formats directly define changelogs. [https://nightlies.apache.org/flink/flink-docs-master/docs/dev/table/concepts/versioned_tables/#versioned-table-sources] Meaning that time travel query should be supported on any table based on a source with versioning support.

Coming to Paimon, my understanding is that Paimon (previously Table Store) has been initially proposed to support querying intermediate Flink tables. More information is given on this in the Motivation section of FLIP-188: Introduce Built-in Dynamic Table Storage]. Especially the example with

CREATE TEMPORARY VIEW intermediate_table AS
SELECT  A.order_id,  A.auction_id,  B.category_id,  A.trans_amount,  A.create_time
FROM orders A LEFT JOIN category_dim BON A.auction_id = B.auction_id;

is helpful to understand. This, to me, implies that intermediate tables do not support time travel as they are not versioned by default. Time travel is supported for intermediate tables only with Paimon. I could not find any doc or article that verifies my understanding, but only this bit: https://paimon.apache.org/docs/master/how-to/querying-tables/#batch-time-travel

I will share updates here as I collect more information and detail.

mfatihaktas avatar Feb 07 '24 19:02 mfatihaktas

For temporal join with an Iceberg table, i believe there is an unsolved dependency on supporting watermark https://github.com/apache/iceberg/pull/6253

zhenzhongxu avatar Feb 08 '24 05:02 zhenzhongxu