airflow
airflow copied to clipboard
let BigQueryGetData operator take a list of fields for the "order by" clause
Description
Sometimes you just need a the latest value of a field (e.g. updatedAt
) so further operators downstream could use said value in their own query.
This can be done by SELECT MAX(updatedAt) [...]
but that would required a lot of re-write, when simply adding a new param ordering_fields
could solve the same issue, allowing to create a query similar to:
SELECT updatedAt FROM [...] LIMIT 1 ORDER BY updatedAt DESC
Example implementation (not tested):
def generate_query(self, hook: BigQueryHook) -> str:
"""Generate a SELECT query if for the given dataset and table ID."""
query = "select "
if self.selected_fields:
query += self.selected_fields
else:
query += "*"
query += (
f" from `{self.table_project_id or hook.project_id}.{self.dataset_id}"
f".{self.table_id}` limit {self.max_results}"
)
if self.ordering_fields:
query += f" ORDER BY {self.ordering_fields}"
return query
Use case/motivation
The operator BigQueryGetData should have 1 more params ordering_fields
so the generated query would also include the ORDER BY
clause.
Related issues
https://github.com/apache/airflow/issues/24460
Are you willing to submit a PR?
- [ ] Yes I am willing to submit a PR!
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
@eladkal Could you please assign me?
BigQueryGetData
's generate_query()
method is used for submitting deferred jobs, while normally, the operator utilizes the hook's method list_rows
. The latter utilizes BigQuery's API directly, which does not support ordering clauses. Having the feature available only for queries the deferred operator makes is discouraged.
There are two options left:
- Running the explicit query (including the ORDER BY clause) via
BigQueryInsertJobOperator
. - Performing the ordering in Python as part of the operator logic, but I'm not sure there is a real advantage compared to the alternative.
@eladkal What do you think?
1. Running the explicit query (including the ORDER BY clause) via `BigQueryInsertJobOperator`.
I might prefer this one. I think the performance would be better than 2
But, how do you get the result when using BigQueryInsertJobOperator
? As far as I understood it it returns the job_id, not the result 🤔
But, how do you get the result when using
BigQueryInsertJobOperator
? As far as I understood it, it returns the job_id, not the result 🤔
You got me on that one, as indeed - it only returns job_id.
Using BigQueryInsertJobOperator
, the only option is to write the results into a BigQuery table, so it might make sense to create an alternative for outputting the results into an XCOM instead.
One possible implementation would be by adding a flag to the current operator, but then it will also have to wait until the job is actually completed.
A better option, IMO, would be to create a dedicated operator that will return the results into the XCOM given a job_id
, to keep operations atomic.
@Lee-W, What are your thoughts about this one?
A better option, IMO, would be to create a dedicated operator that will return the results into the XCOM given a job_id, to keep operations atomic.
This sounds to be a better solution 👍
A better option, IMO, would be to create a dedicated operator that will return the results into the XCOM given a job_id, to keep operations atomic.
This sounds to be a better solution 👍
Thanks :)
I've been trying out different configurations, and after discussing with @eladkal we came to the conclusion that querying data by job_id
could be merged with the existing BigQueryGetData
operator, as logic is very similar (also in the deferrable mode).
I'll create a PR and tag you when I'm done.
Fixed in https://github.com/apache/airflow/pull/39315