trino icon indicating copy to clipboard operation
trino copied to clipboard

CREATE [OR REPLACE] TABLE support for Iceberg and Delta

Open wrb2 opened this issue 3 years ago • 15 comments

Both Iceberg and Delta Lake on Spark support the OR REPLACE clause in CREATE TABLE statement. Those allow to atomically replace a table (using Delta or Icebergs transactional capabilities).

Having this option available in Trino would enable tools that use CTAS kill-and-fill strategy for filling tables (like dbt) without a period of data unavailability.

wrb2 avatar Jul 14 '22 14:07 wrb2

Note that Spark's CREATE OR REPLACE TABLE an_existing_delta_table doesn't replace the table. Time travel over an_existing_delta_table shows old data. I know it has practical benefits (e.g. doesn't break transactionality), but at least to me it was initially quite unexpected behavior.

cc @claudiusli @martint @alexjo2144 @losipiuk

findepi avatar Jul 20 '22 13:07 findepi

Snowflake also supports this:

See https://docs.snowflake.com/en/sql-reference/sql/create-table.html#usage-notes

Using OR REPLACE is the equivalent of using DROP TABLE on the existing table and then creating a new table with the same name; however, the dropped table is not permanently removed from the system. Instead, it is retained in Time Travel. This is important to note because dropped tables in Time Travel can be recovered, but they also contribute to data storage for your account. For more information, see Storage Costs for Time Travel and Fail-safe.

In addition, note that the drop and create actions occur in a single atomic operation. This means that any queries concurrent with the CREATE OR REPLACE TABLE operation use either the old or new table version.

mdesmet avatar Jul 23 '22 06:07 mdesmet

In addition, note that the drop and create actions occur in a single atomic operation. This means that any queries concurrent with the CREATE OR REPLACE TABLE operation use either the old or new table version.

That is basically the point of this feature. Right now, when someone is reading table that is being refreshed by drop+CTAS, there might be several minutes of queries failing. This is probably fine for "daily refresh" use case but if you need table refreshed every hour, having it gone for five minutes of that hour is problematic.

wrb2 avatar Aug 04 '22 06:08 wrb2

I know it has practical benefits (e.g. doesn't break transactionality), but at least to me it was initially quite unexpected behavior.

That is indeed confusing. Giving it different semantics than the following sequence feels arbitrary and introduces more cognitive load for users:

START TRANSACTION;
DROP TABLE IF EXISTS ... ;
CREATE TABLE ... ;
COMMIT;

martint avatar Aug 23 '22 22:08 martint

That is indeed confusing. Giving it different semantics than the following sequence feels arbitrary and introduces more cognitive load for users:

START TRANSACTION;
DROP TABLE IF EXISTS ... ;
CREATE TABLE ... ;
COMMIT;
start transaction;
drop table if exists;
-- table disappears
create table as select;
-- table appears once create finishes, which might be 10 minutes or two hours later
commit;

If someone runs select between the table disappearing and CTAS finishing on something like Postgres, that's fine because Postgres transactions are actual transactions and nobody would see the intermediate state.

But how would this behave with Trino and Hive, Iceberg or Delta catalogs? As I understand it, the "transactions" on those aren't really transactions but a "single statement transactions". So if someone else runs a select between the drop and the CTAS finishing, they will get an error, dashboards or applications might be impacted, etc.

So the different semantics is the point of the feature. There is no other way how to implement zero-downtime table refresh on Trino with Hive, Delta or Iceberg without it.

Sorry for possibly dramatic statement, but the ability to refresh a table/report/dashboard without downtime is pretty important for viability of "Trino Lakehouse" architecture.

wrb2 avatar Aug 24 '22 06:08 wrb2

But how would this behave with Trino and Hive, Iceberg or Delta catalogs? As I understand it, the "transactions" on those aren't really transactions but a "single statement transactions". So if someone else runs a select between the drop and the CTAS finishing, they will get an error, dashboards or applications might be impacted, etc.

Currently, Iceberg and Delta do not support Trino transactions, ie they support auto-commit only.

cc @electrum

There is no other way how to implement zero-downtime table refresh on Trino with Hive, Delta or Iceberg without it.

Two alternatives

  • create new table; use CREATE OR REPLACE VIEW on top of the table to stage it in (this option exists for years)
  • use materialized views which basically are awesome for queries you want to periodically refresh

cc @alexjo2144 @sopel39 @raunaqmorarka

findepi avatar Aug 24 '22 08:08 findepi

Currently, Iceberg and Delta do not support Trino transactions, ie they support auto-commit only.

Is it possible for that to be supported?

As per my understanding, these cloud storage lakehouse layers like Delta or Iceberg only ever do single statement + single object transaction as a matter of design, so it might not be possible. But maybe that changed since I last looked.

Thinking of the alternatives:

The first one would work, but has the issue of basically duplicating all your objects, either into a different namespace or into the same namespace with different names. This either requires hiding the duplicate objects from users with access control, or you have to explain to people which are the right objects to use. This adds a lot of extra complexity for the end-users.

As for materialized view, I'm not sure.

The documentation for Iceberg says:

There is a small time window between the commit of the delete and insert, when the materialized view is empty. If the commit operation for the insert fails, the materialized view remains empty.

So I'm not sure if I'm reading it right, but it seems there is data unavailability period as well.

Second issue is, C[OR]TAS would handle schema changes as well, while materialized view handle only refresh - that means if there's schema change, you have to drop it and recreate it. That is not very common occurrence I think and I think schema change implies some release / deploy / change control, so people would be more willing to tolerate downtime at that time. But it would need extra logic to handle things on client side (e.g. dbt-trino people would have to implement "did the schema change? if yes, do one thing, if no, do other thing").

Third issue I see is that the the semantics of refresh of MVs is kind of unfortunate for data products, since the first refresh is asynchronous.

Let me unpack that:

Imagine having data product with three tables: a staging table, a core table, and a report table. Staging table is computed by bringing data from other schemas, core table is computed by transforming data from staging table, report table is computed by transforming data from core table. (Realistically this would have a lot more tables of course.)

To execute this, you would run 1) CTAS/CMV staging table, 2) CTAS/CMV core table, 3) CTAS/CMV report table. If I build this using CTAS, the CTAS is synchronous, so you can run them in order.

For MVs though, CREATE MATERIALIZED VIEW returns immediately, refresh happens in the background and until that refresh is complete it behaves like a view.

So you run CMV staging table, it returns immediately and schedule refresh of data. Then you run CMV core table, and it is also going to schedule refresh of data, but unless the staging table background is refreshed, it will treat staging table as a view - so it will in the background run not just the transformation between staging and core, but also between source and staging. Then you run CMV report table, and unless core table refresh is done, it will behave like a view again when the background refresh starts.

This means the transformation between source and staging is executed in total three times - once for refresh the staging table, once for refresh of the core table, once for refresh of the report table. The transformation from staging to core is executed twice. The transformation from core to report is executed once. And of course, with more tables (or when the user queries the "refreshed" data), you would see a lot more amplification.

So my take is that materialized views as they work now aren't really usable for anything with "layers".

(This could be improved by adding something like CREATE/REFRESH MATERIALIZED VIEW SYNCHRONOUSLY, which would give you CTAS like behavior but that is discussion for different time I'd say.)

wrb2 avatar Aug 24 '22 15:08 wrb2

There is a small time window between the commit of the delete and insert, when the materialized view is empty. If the commit operation for the insert fails, the materialized view remains empty.

We should fix that. cc @claudiusli @raunaqmorarka @alexjo2144

Second issue is, C[OR]TAS would handle schema changes as well, while materialized view handle only refresh - that means if there's schema change, you have to drop it and recreate it.

for MV, you have a CREATE (OR REPLACE)? MATERIALIZED VIEW syntax

Third issue I see is that the the semantics of refresh of MVs is kind of unfortunate for data products, since the first refresh is asynchronous.

This i don't understand. we definitely want to have a statement that forces an MV to become fresh (be refreshed) and is synchronous.

For MVs though, CREATE MATERIALIZED VIEW returns immediately, refresh happens in the background and until that refresh is complete it behaves like a view.

So you run CMV staging table, it returns immediately and schedule refresh of data.

In Trino, there is no background work happening after CREATE MATERIALIZED VIEW completes. REFRESH needs to be explicit. There is no scheduler, no queries running implicitly, and no query can run without being "overseen" by a calling application, otherwise it's considered "abandoned" and killed.

There is also a proposal to let CMV do immediate REFRESH, see https://github.com/trinodb/trino/pull/12570. (still up to debate).

So my take is that materialized views as they work now aren't really usable for anything with "layers".

That would mean they are not usable at all. I hope it's not that bad, but I am sure there is room for improvement. I think we're doing awesome progress identifying those necessary improvements.

findepi avatar Aug 25 '22 12:08 findepi

So my take is that materialized views as they work now aren't really usable for anything with "layers".

What do you think it will take to make MVs more usable?

I think you would want some lineage, but IMO MVs scheduler should live outside of core engine.

sopel39 avatar Aug 25 '22 13:08 sopel39

In Trino, there is no background work happening after CREATE MATERIALIZED VIEW completes. REFRESH needs to be explicit. There is no scheduler, no queries running implicitly, and no query can run without being "overseen" by a calling application, otherwise it's considered "abandoned" and killed.

Oh! I might be confusing this with how SEP materialized views work. Apologies, if that's the case.

wrb2 avatar Aug 25 '22 14:08 wrb2

If a CREATE OR REPLACE MV does an immediate refresh and also does an atomic swap (the view is never hit by actual users and the existing storage table is replaced instead of deleted and inserted in a single transaction as demonstrated in #13681 with a potential totally new column layout), it would effectively be the same as CREATE OR REPLACE TABLE.

There is also the potential performance benefits of running REFRESH MV if we can somehow support incremental refreshes.

It might still be a little more complicated without any (current) benefits compared to a simple CREATE OR REPLACE TABLE as the user needs to distinguish between an initial load (table layout has changed) and a refresh (just load the new data into the same table layout).

mdesmet avatar Aug 25 '22 14:08 mdesmet

cc @romanvainb

findepi avatar Aug 26 '22 11:08 findepi

I'm not convinced that the OR REPLACE has to mean DROP TABLE. It's a new keyword REPLACE which we can define however we want. If we say that it means "replace the table schema and contents" (and probably table properties), then the history-preserving behavior makes sense.

electrum avatar Aug 31 '22 00:08 electrum

Do we have the necessary alignment to move forward and start reviewing the PR?

mdesmet avatar Sep 07 '22 11:09 mdesmet

There is a small time window between the commit of the delete and insert, when the materialized view is empty. If the commit operation for the insert fails, the materialized view remains empty.

We should fix that.

A fix for that is coming in coming in the 397 release https://github.com/trinodb/trino/pull/14145

alexjo2144 avatar Sep 21 '22 16:09 alexjo2144

Hi everyone,

I'm not sure in what way is this stuck? What can we do to make it happen?

wrb2 avatar Sep 23 '22 08:09 wrb2

A fix for that is coming in coming in the 397 release #14145

@mdesmet please see that, as an alternative feature you could use

findepi avatar Sep 23 '22 10:09 findepi

back to the main topic, i think there are merits to being able to atomically replace a regular table. CREATE OR REPLACE is a syntax some other engines use for that. I am not a fan of this syntax, but also I don't have any other alternatives.

I don't think modeling this as BEGIN TRANSACTION; DROP TABLE; CREATE TABLE is a viable option. It's neither convenient (quite verbose), nor really in line of our design (we would rather get rid of transactions).

If no new options show up, I think we should provide CREATE OR REPLACE TABLE syntax and use it for atomic updates of tables eg in Delta or Iceberg.

findepi avatar Sep 23 '22 10:09 findepi

If there are reasonable needs for both behaviors would it make sense to pick one as a default and but allow the user to override it?

claudiusli avatar Sep 23 '22 15:09 claudiusli

@martint, I would argue that since there are systems out there implementing C[OR]TAS already, it is choosing different semantics in Trino that would introduce additional cognitive load for users. And, as @electrum pointed out, the proposed semantics are logically consistent.

aalbu avatar Sep 27 '22 01:09 aalbu

I agree with @electrum and @aalbu . I think we should allow CREATE OR REPLACE TABLE [AS ...] syntax, and let the connector handle the replacement atomically.

@trinodb/maintainers i think we're converging here to an agreement.

findepi avatar Sep 27 '22 10:09 findepi

If we’re going that route, we still need to sort out some syntactic issues.

  1. The “if not exists” clause doesn’t make sense in the context of REPLACE. A “create or replace” already implies that behavior
  2. Is REPLACE a first class command? Would we also consider adding syntax for REPLACE TABLE as a top level command?

martint avatar Sep 27 '22 15:09 martint

It's neither convenient (quite verbose), nor really in line of our design (we would rather get rid of transactions).

I agree that it’s less convenient, but on the contrary, we need to make transactions work better in the long run. They are going to be increasingly important to support more complex transformation pipelines that may involve intermediate temporary tables and functions.

Also, having better support for transactions can allow the engine to decompose queries into smaller independent steps to exploit some optimization opportunities, and still make it appear as a single operation time the user.

martint avatar Sep 27 '22 15:09 martint

  • The “if not exists” clause doesn’t make sense in the context of REPLACE. A “create or replace” already implies that behavior

good point, the IF NOT EXISTS clause should not be allowed for CREATE OR REPLACE TABLE, since it doesn't make sense

Is REPLACE a first class command? Would we also consider adding syntax for REPLACE TABLE as a top level command?

I wouldn't add that, but we don't need to make this decision now.

findepi avatar Sep 27 '22 17:09 findepi

Seems there is no disagreement, so i removed the syntax-needs-review. Thanks to everyone involved in the discussion!

findepi avatar Oct 03 '22 08:10 findepi

@findepi, thanks for the update. Since the syntax has been confirmed, can it be expected the change as designed by @mdesmet will be merged?

bohyn avatar Oct 04 '22 15:10 bohyn

As @bohyn requested, when can this feature be merged?

prgx-aeveri01 avatar Nov 09 '22 11:11 prgx-aeveri01

@bohyn @prgx-aeveri01 we will push the PR forward and report back to you. https://github.com/trinodb/trino/pull/11763

leniartek avatar Nov 16 '22 15:11 leniartek

This would be awesome to support CREATE OR REPLACE. For dbt-trino this would mean that we can replace:

create table sandbox.dbt_trino.rides__dbt_tmp
alter table "sandbox"."dbt_trino"."rides" rename to sandbox.dbt_trino.rides__dbt_backup
alter table sandbox.dbt_trino.rides__dbt_tmp rename to sandbox.dbt_trino.rides
drop table if exists sandbox.dbt_trino.rides__dbt_backup

with:

create table or replace sandbox.dbt_trino.rides

And we'll end up in a more atomic state as well.

Fokko avatar Jun 30 '23 10:06 Fokko

@martint can we close this as of 421?

bitsondatadev avatar Jul 10 '23 23:07 bitsondatadev