airflow
airflow copied to clipboard
Add deferred pagination mode to GenericTransfer
As explained in my Airflow medium blogpost, I've refactored the GenericTransfer to support deferred paginated reads.
When dealing with large datasets, not the whole dataset needs to be read into memory first before persisting it afterwards, as this could otherwise lead to out of memory errors on the worker executing the code.
I also took the opportunity to introduce an SQLExecuteQueryTrigger in the common sql provider, allowing the GenericTransfer to handle the paginated reads in deferred mode, so that the paginated reads can be decoupled from the writes, which shouldn't continuously block the worker as it can offload the reads to the triggerer while persisting the previous page in the meantime.
Once the dialects PR is done, we could improve the way how the GenericTransfer handles the paginated SQL queries across different databases. At this moment the paginated SQL query can be customized through the paginated_sql_statement_format parameter. The read size can be specified through the chunck_size parameter, maybe another (better) name could be preferred here but that I let you guy's decide how it's best named. If no chunk_size is specified, then the original implementation is used and everything is read and persisted in one go.
Last but not least, I've moved the test code to test deferrable operators out of the microsoft azure provider and put it into the common test utils, so it can be re-used across multiple modules.
^ Add meaningful description above
Read the Pull Request Guidelines for more information.
In case of fundamental code changes, an Airflow Improvement Proposal (AIP) is needed.
In case of a new dependency, check compliance with the ASF 3rd Party License Policy.
In case of backwards incompatible changes please leave a note in a newsfragment file, named {pr_number}.significant.rst
or {issue_number}.significant.rst
, in newsfragments.