airflow icon indicating copy to clipboard operation
airflow copied to clipboard

let BigQueryGetData operator take a list of fields for the "order by" clause

Open lopezvit opened this issue 10 months ago • 5 comments

Description

Sometimes you just need a the latest value of a field (e.g. updatedAt) so further operators downstream could use said value in their own query. This can be done by SELECT MAX(updatedAt) [...] but that would required a lot of re-write, when simply adding a new param ordering_fields could solve the same issue, allowing to create a query similar to: SELECT updatedAt FROM [...] LIMIT 1 ORDER BY updatedAt DESC

Example implementation (not tested):

def generate_query(self, hook: BigQueryHook) -> str:
    """Generate a SELECT query if for the given dataset and table ID."""
    query = "select "
    if self.selected_fields:
        query += self.selected_fields
    else:
        query += "*"
    query += (
        f" from `{self.table_project_id or hook.project_id}.{self.dataset_id}"
        f".{self.table_id}` limit {self.max_results}"
    )
    if self.ordering_fields:
        query += f" ORDER BY {self.ordering_fields}"
    return query

Use case/motivation

The operator BigQueryGetData should have 1 more params ordering_fields so the generated query would also include the ORDER BY clause.

Related issues

https://github.com/apache/airflow/issues/24460

Are you willing to submit a PR?

  • [ ] Yes I am willing to submit a PR!

Code of Conduct

lopezvit avatar Apr 19 '24 08:04 lopezvit

@eladkal Could you please assign me?

shahar1 avatar Apr 20 '24 20:04 shahar1

BigQueryGetData's generate_query() method is used for submitting deferred jobs, while normally, the operator utilizes the hook's method list_rows. The latter utilizes BigQuery's API directly, which does not support ordering clauses. Having the feature available only for queries the deferred operator makes is discouraged.

There are two options left:

  1. Running the explicit query (including the ORDER BY clause) via BigQueryInsertJobOperator.
  2. Performing the ordering in Python as part of the operator logic, but I'm not sure there is a real advantage compared to the alternative.

@eladkal What do you think?

shahar1 avatar Apr 23 '24 17:04 shahar1

1. Running the explicit query (including the ORDER BY clause) via `BigQueryInsertJobOperator`.

I might prefer this one. I think the performance would be better than 2

Lee-W avatar Apr 24 '24 06:04 Lee-W

But, how do you get the result when using BigQueryInsertJobOperator? As far as I understood it it returns the job_id, not the result 🤔

lopezvit avatar Apr 25 '24 15:04 lopezvit

But, how do you get the result when using BigQueryInsertJobOperator? As far as I understood it, it returns the job_id, not the result 🤔

You got me on that one, as indeed - it only returns job_id. Using BigQueryInsertJobOperator, the only option is to write the results into a BigQuery table, so it might make sense to create an alternative for outputting the results into an XCOM instead. One possible implementation would be by adding a flag to the current operator, but then it will also have to wait until the job is actually completed. A better option, IMO, would be to create a dedicated operator that will return the results into the XCOM given a job_id, to keep operations atomic.

@Lee-W, What are your thoughts about this one?

shahar1 avatar Apr 26 '24 15:04 shahar1

A better option, IMO, would be to create a dedicated operator that will return the results into the XCOM given a job_id, to keep operations atomic.

This sounds to be a better solution 👍

Lee-W avatar Apr 29 '24 13:04 Lee-W

A better option, IMO, would be to create a dedicated operator that will return the results into the XCOM given a job_id, to keep operations atomic.

This sounds to be a better solution 👍

Thanks :) I've been trying out different configurations, and after discussing with @eladkal we came to the conclusion that querying data by job_id could be merged with the existing BigQueryGetData operator, as logic is very similar (also in the deferrable mode). I'll create a PR and tag you when I'm done.

shahar1 avatar Apr 29 '24 15:04 shahar1

Fixed in https://github.com/apache/airflow/pull/39315

eladkal avatar May 17 '24 15:05 eladkal