dbt-fal icon indicating copy to clipboard operation
dbt-fal copied to clipboard

Enable updates to a model from fal scripts

Open chamini2 opened this issue 2 years ago • 2 comments

Initial proposal

The idea of this function is to be able to update a model table after it has been run (as an after-hook).

An example scenario would be:

-- models/tickets/tickets_with_sentiment.sql
SELECT
  *,
  -- NOTE: will be filled by fal in sentiment_analysis.py
  NULL AS label, 
  NULL AS score
FROM {{ ref('tickets') }}

Then, the after-hook:

# models/tickets/sentiment_analysis.py
ticket_data = ref(context.current_model)
ticket_descriptions = list(ticket_data.description)
classifier = pipeline("sentiment-analysis")
description_sentiment_analysis = classifier(ticket_descriptions)

rows = []
for id, sentiment in zip(ticket_data.id, description_sentiment_analysis):
    rows.append((int(id), sentiment["label"], sentiment["score"]))

records = np.array(rows, dtype=[("id", int), ("label", "U8"), ("score", float)])

sentiment_df = pd.DataFrame.from_records(records)

print("Uploading\n", sentiment_df)
write_to_model(
	dataframe=sentiment_df,
	# needed because function has no context of where it is being called from
	# we just have to document very well
	# (btw, what would happen if people used it "wrong"?)
	ref=context.current_model,
	id_column='id', # must be the same in df and table, used for knowing WHICH row to update
	columns=['label', 'score'] # defaults to ALL columns in dataframe?
)

How would the actual SQL statement look?

SQL does not match this kind of operation of inserting data on already existing rows very well. So you usually are updating data based on other database data or not doing it in big batches as we will.

The following SQL statement should work. However, more ideas may come up.

UPDATE {{ ref('tickets') }} _table
JOIN (
    SELECT 1 as id, 'positive' AS label, 0.8 AS score
    UNION ALL
    SELECT 1 as id, 'negative' AS label, 0.6 AS score
    UNION ALL
    SELECT 1 as id, 'neutral' AS label, 0.9 AS score
) _insert
ON _insert.id = _table.id
SET
	_table.label = _insert.label,
	_table.score = _insert.score;

chamini2 avatar Mar 18 '22 00:03 chamini2

I have a potentially interesting idea:

what if we leverage the existing dbt macros to do the merge?

so the operation would be:

  1. upload the dataframe to a staging table
  2. leverage the adapter specific “merge” macros to merge the two tables

i am thinking this could help: https://github.com/dbt-labs/dbt-core/blob/main/core/dbt/include/global_project/macros/materializations/snapshots/snapshot_merge.sql

and for dbt-bigquery, maybe we could use this: https://github.com/dbt-labs/dbt-bigquery/blob/main/dbt/include/bigquery/macros/materializations/snapshot.sql

and so on

Curious to see if this would lead anywhere.

burkaygur avatar Mar 18 '22 01:03 burkaygur

We are avoiding updates for now and just support append and overwrite behavior for the new write_to_model function (#249).

chamini2 avatar Apr 22 '22 11:04 chamini2