dbt-athena
dbt-athena copied to clipboard
Create a new materialization insert_by_period
Inspired by https://gist.github.com/jessedobbelaere/6fdb593f9e2cc732e9f142c56c9bac87 create a new materialisation: insert_by_period.
Requirements
- Support table_type=hive
- Support for table_type=iceberg
- to make the implementation idempotent add
delete from table period={period_to_process}to avoid duplicates in case some error occur
- to make the implementation idempotent add
@nicor88 WDYT is this materialisation still needed ? I believe this commit resolves huge source datasets processing.
@svdimchenko yes, I believe could be still needed. In the current adapter setup we have a first step where the all table is saved as not partitioned dataset. If the amount of data is huge, there user might want to add filters to the dataset to break it down, that's where insert_by_period comes into play.
@svdimchenko for more context on my last message see this one https://github.com/dbt-athena/dbt-athena/issues/471
You should support also unique_key, I don't think the gist code has any way to deal with that.
I am thinking of cases you would need to deal with rows that get updated and developer wants to use this config
config(materialized='sp_insert_by_period', timestamp_field = 'updated_at', ...)
@mycaule why unique_key should included in insert_by_period? Insert by period by definition acts on time, and it delete insert batches of periods. If you need to do upsert by unique_key maybe you consider to use an incremental materialization with merge strategy and of course unique_key.
Using incremental fails for large tables as you discussed in https://github.com/dbt-athena/dbt-athena/issues/471 and the snippet from redshift provided in the discussion was the only quick way to do it in my case.
And force_batch: (config(materialized='incremental', unique_key='id', force_batch=True)) doesn't give the progress, neither can we continue if the dbt run fails.
Which snippet for redshift? Do you mean this https://github.com/dbt-athena/dbt-athena/issues/471#issuecomment-1790572382 ? if so I don't see any reference to unique_key. Could you please add the reference to the snippet that you are talking about?
Also:
config(materialized='incremental', unique_key='id', force_batch=True))
that is incomplete, that is your default incremental_strategy? I assume merge if you use unique_key.
Also few challenges:
- adding unique_key to insert by period in my opinion is really not ideal - the update of those unique_key will happen naturally when you delete/insert by period
In order to understand more what you want to achieve I need more context to understand what you plan to build.
Also force_batch don't help you if you are not using partitioned datasets, and if your initial query is big, it will still fail, because it will try to process the all datasets - that's where insert_by_period comes in handy.
Thank you for the details I hadn't this in mind.
This snippet talks about Redshift and adapting to Athena: https://gist.github.com/jessedobbelaere/6fdb593f9e2cc732e9f142c56c9bac87
For the moment I use the default incremental_strategy which should be append, using Hive format for the moment because of legacy. For the merge strategy to work I would have to move to Iceberg.
I am trying to materialize a very large table through a federated query to RDS, and I want to Athena table to be refreshed daily in my datalake. Existing rows in RDS can change over time, I have id (primary key), created_at and updated_at columns at disposal.
Yes, insert_by_period is adapted from other adapter (redshift in this case), so seems like you were rerefing to the gist, not directly https://github.com/dbt-athena/dbt-athena/issues/471
Existing rows in RDS can change over time, I have id (primary key), created_at and updated_at columns at disposal.
do you expect to have updated_at to change? what you will generally do with an incremental strategy is to pick only records that have a change (means updated_at MUST change) to let this working, using is_incremental macro.
Now the challenge is that:
- on the first run of the model, due to the big dataset, you will like to have a behavior like insert_by_period,
- for incrementally runs(means on smaller datasets, you must apply a filter e.g.
WHERE updated_at >= select max(updated_at) from {{this}}) - you want to have a "normal" incremental materialization with merge.
materialized='incremental', table_type='iceberg', unique_key='id', incremental_strategy='merge'
When I used Iceberg + merge strategy, I didn't notice any duration improvement on second iteration while dbt logs saying only a few rows were updated.
materialized='incremental', incremental_strategy='insert_overwrite'
Let's imagine I stay with Hive, I understand that to be able to use insert_overwrite, the partitioned_by field is mandatory
I also had different finite status columns describing the lifecycle of the rows: status_1, status_2, status_3, and partitioned_by = ['status_1', 'status_2', 'status_3'] was faster using Hive than Iceberg.
To deal with tables without lifecycle columns, I also tried to create artificial partitions using YEAR(created_at) AS year, MONTH(created_at) AS month, DAY(created_at) AS day and then partitioned_by = ['year', 'month', 'day'].
Do you think I can do partitioned_by=['bucket(id, 100)'] to both stay within the 100 partitions limit, and only rewrite on average $\mathcal{O}(n/100)$ of my data. Or is it only supported by Iceberg as shown in this example from the docs
💡 Last night, dbt made this announcement about a new incremental-strategy for time-series event data. It introduces incremental_strategy='microbatch' to enable micro-batching to replace the custom insert-by-period macro's in the wild. It also seems to simplify things a lot, less config and including sampling for dev runs. Let's focus on integrating with this feature when it comes out?
@jessedobbelaere we should indeed consider to support the microbatch in dbt-athena. We should consider to create another issue to track incremental_strategy microbatch, and closing eventually this one.
Exactly 👍 I've created #715 as placeholder
Hi folks - our team is already looking at supporting our new Microbatch strategy on this adapter so for now, we are going to go ahead and close out this issue since this will address this issue :)