airflow icon indicating copy to clipboard operation
airflow copied to clipboard

copy into snowflake from external source

Open blcksrx opened this issue 3 years ago • 8 comments

closes: #25395

blcksrx avatar Aug 04 '22 18:08 blcksrx

@potiuk In my opinion, it's better to make this operator as an abstract class and implement it separately [just overriding external location section] on each cloud provider cause I believe from user experience it's not pleasant if he/she already created a cloud connection such as AWS and does not use it on this operator beside of the security breach on the sql part that needs to pass the credentials as args. In addition, using externalStage needs manual effort on the user side to first create a stage then use it on the airflow that reduces the flexibility.

P.S: I created this draft PR to let you know this issue and know your opinion about it

c.c: @Taragolis

blcksrx avatar Aug 04 '22 18:08 blcksrx

Yeah, I agree that better implement as some abstract class or just raise an NotImplementedError on call specific method


class BaseCopyIntoTable(BaseOperator):
    def __init__(self, somearg, some_other_arg, **kwargs):
        super.__init__(**kwargs)
        ...

    def from_location(self):
        raise NotImplementedError()

    def execute(self, context: Context) -> None:
        snowflake_hook = SnowflakeHook(...)

        sql = f"""
             COPY INTO blah.blah FROM self.from_location()
             FROM {self.external_location}
             Other useful argument for COPY INTO <table>
        """
        self.log.info('Executing COPY command...')
        snowflake_hook.run(sql)
        self.log.info("COPY command completed")

And this kind of BaseCopyIntoTable could use for multiple different situations and operators (not required implement all of them in one PR)

COPY INTO without transformation

  1. Specified named internal/external stage
  2. Stage for the specified table
  3. Stage for the current user
  4. Specified external location for supported cloud providers by providing storage integration name (provider agnostic way)
  5. Specified external location for AWS S3 by providing credentials, not recommended way and also we need to be sure that credentials not licked to logs. Obtain credentials by AwsHook
  6. ~Specified external location for GCS by providing credentials, not recommended way and also we need to be sure that credentials not licked to logs.~ Snowflake not support access to GCS by providing credentials
  7. Specified external location for Azure Containers by providing credentials, not recommended way and also we need to be sure that credentials not licked to logs.

COPY INTO with transformation

  1. Specified named internal/external stage
  2. Stage for the specified table
  3. Stage for the current user

Also it would be nice if this abstract operator supports all subset of snowflake hook parameters which implemented in existed S3ToSnowflakeOperator https://github.com/apache/airflow/blob/cda40836ca55702c543391367d0829ed231c1b35/airflow/providers/snowflake/transfers/s3_to_snowflake.py#L106-L114

Taragolis avatar Aug 04 '22 19:08 Taragolis

I disagree. I think it's better to keep it in Snowflake. First of all by definition (this is our rule of deciding where to add such "transfer" operators is that "target" counts. No matter if it is GCS/S3 etc. everything should be in Snowflake provider.

As far as I understand - you cannot even reuse the connnections that you alrready defined in Airflow, because Airflow connections have different format than the one snowflake uses - and you anyhow would have to do some mapping betweeen then ones in Connection and the ones you send to snowflake.

potiuk avatar Aug 05 '22 15:08 potiuk

It's true that Airflow connection is different than Snowflake connection but it's possible to create Snowflake connection from Airflow. In that case I'm going to work on ExternalStage operator cause ExternalLocation would have leakage in the logs.

blcksrx avatar Aug 05 '22 16:08 blcksrx

@blcksrx Personal thoughts just copy code from S3ToSnowflakeOperator create new operator based, and inherit S3ToSnowflakeOperator by new operator (with deprecation warnings)

Everything except copyOptions and VALIDATION_MODE already implemented there. And it already works with all Cloud Providers which already has configured externalStage for appropriate cloud provider (current and futures)

@potiuk When I mention that I agreed with abstract class I meant that right now current S3ToSnowflakeOperator basically just a query builder for Snowflake COPY INTO <table> and only one piece of query might be changed depend on location (external stage, internal stage, external location), which could be implemented per appropriate class, same as it done in BaseSQLToGCSOperator. I absolutely agree with the fact that it should be part of Snowflake Provider.

And last but not least I do not sure that this operator (current and in this PR) should be a placed in transfers it only generates SQL Statement and everything transfers between Cloud (as well as local stages) complete internally in Snowflake

Taragolis avatar Aug 07 '22 11:08 Taragolis

@potiuk When I mention that I agreed with abstract class I meant that right now current S3ToSnowflakeOperator basically just a query builder for Snowflake COPY INTO table and only one piece of query might be changed depend on location (external stage, internal stage, external location), which could be implemented per appropriate class, same as it done in [BaseSQLToGCSOperator] (https://github.com/apache/airflow/blob/ae7bf474109410fa838ab2728ae6d581cdd41808/airflow/providers/google/cloud/transfer s/sql_to_gcs.py#L316-L326). I absolutely agree with the fact that it should be part of Snowflake Provider.

That's cool. If we keep'em all in Snowflake, then that is perfectly ok.

And last but not least I do not sure that this operator (current and in this PR) should be a placed in transfers it only generates SQL Statement and everything transfers between Cloud (as well as local stages) complete internally in Snowflake

Crossed my mind too. I think it shoudl indeed be in the regular operators package.

potiuk avatar Aug 07 '22 11:08 potiuk

Crossed my mind too. I think it shoudl indeed be in the regular operators package.

The only one case when it could be as transfers when query use credentials (instead of stages or integrations) for AWS or Azure (GCS not supported), like

COPY INTO FOO.BAR
  FROM s3://source-bucket-123/key/path/location
  CREDENTIALS = ( AWS_KEY_ID='xxxx' AWS_SECRET_KEY='xxxxx' AWS_TOKEN='xxxxxx' )
  FILE_FORMAT = ( TYPE = PARQUET )
;
COPY INTO FOO.BAR
  FROM azure://{account}.blob.core.windows.net/source-container-123/key/path/location
  CREDENTIALS = ( AZURE_SAS_TOKEN = 'xxx' )
  FILE_FORMAT = ( TYPE = PARQUET )
;

But I don't think this a good idea to implement operator which pass credentials directly to SQL query (not as part Snowflake providers and any others). This credentials could expose to logs and also it not recommended by Snowflake docs:

We highly recommend the use of storage integrations.
This option avoids the need to supply cloud storage credentials using the CREDENTIALS parameter when creating stages or loading data.

However if we implement COPY INTO <table> as some kind of abstract operator than end users might create their own custom operator to fit their requirements and provide credentials as part of SQL query if they do not won't follow recommendation and security practice.

Taragolis avatar Aug 07 '22 12:08 Taragolis

But I don't think this a good idea to implement operator which pass credentials directly to SQL query (not as part Snowflake providers and any others). This credentials could expose to logs and also it not recommended by Snowflake docs:

Absolutely.

However if we implement COPY INTO

as some kind of abstract operator than end users might create their own custom operator to fit their requirements and provide credentials as part of SQL query if they do not won't follow recommendation and security practice.

Yes but we should discourage that in our docs as well, pointing to SF docs.

potiuk avatar Aug 07 '22 13:08 potiuk