dbt-fal
dbt-fal copied to clipboard
Enable updates to a model from fal scripts
Initial proposal
The idea of this function is to be able to update a model table after it has been run (as an after-hook).
An example scenario would be:
-- models/tickets/tickets_with_sentiment.sql
SELECT
*,
-- NOTE: will be filled by fal in sentiment_analysis.py
NULL AS label,
NULL AS score
FROM {{ ref('tickets') }}
Then, the after-hook:
# models/tickets/sentiment_analysis.py
ticket_data = ref(context.current_model)
ticket_descriptions = list(ticket_data.description)
classifier = pipeline("sentiment-analysis")
description_sentiment_analysis = classifier(ticket_descriptions)
rows = []
for id, sentiment in zip(ticket_data.id, description_sentiment_analysis):
rows.append((int(id), sentiment["label"], sentiment["score"]))
records = np.array(rows, dtype=[("id", int), ("label", "U8"), ("score", float)])
sentiment_df = pd.DataFrame.from_records(records)
print("Uploading\n", sentiment_df)
write_to_model(
dataframe=sentiment_df,
# needed because function has no context of where it is being called from
# we just have to document very well
# (btw, what would happen if people used it "wrong"?)
ref=context.current_model,
id_column='id', # must be the same in df and table, used for knowing WHICH row to update
columns=['label', 'score'] # defaults to ALL columns in dataframe?
)
How would the actual SQL statement look?
SQL does not match this kind of operation of inserting data on already existing rows very well. So you usually are updating data based on other database data or not doing it in big batches as we will.
The following SQL statement should work. However, more ideas may come up.
UPDATE {{ ref('tickets') }} _table
JOIN (
SELECT 1 as id, 'positive' AS label, 0.8 AS score
UNION ALL
SELECT 1 as id, 'negative' AS label, 0.6 AS score
UNION ALL
SELECT 1 as id, 'neutral' AS label, 0.9 AS score
) _insert
ON _insert.id = _table.id
SET
_table.label = _insert.label,
_table.score = _insert.score;
I have a potentially interesting idea:
what if we leverage the existing dbt macros to do the merge?
so the operation would be:
- upload the dataframe to a staging table
- leverage the adapter specific “merge” macros to merge the two tables
i am thinking this could help: https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql
and for dbt-bigquery, maybe we could use this: https://github.com/dbt-labs/dbt-bigquery/blob/main/dbt/include/bigquery/macros/materializations/snapshot.sql
and so on
Curious to see if this would lead anywhere.
We are avoiding updates for now and just support append and overwrite behavior for the new write_to_model
function (#249).