PyAirbyte icon indicating copy to clipboard operation
PyAirbyte copied to clipboard

Feature Request: Support key-pair authentication for Snowflake cache

Open nakamichiworks opened this issue 9 months ago • 2 comments

Snowflake plans to disable password-only authentication in the near future: https://docs.snowflake.com/en/user-guide/security-mfa-rollout

PyAirbyte currently supports only password authentication, but it should support more secure and allowed authentication methods such as key-pair auth.

nakamichiworks avatar Apr 09 '25 11:04 nakamichiworks

@nakamichiworks, would you like to try implementing this? I would be happy to provide you with some guidance.

marcosmarxm avatar Apr 10 '25 17:04 marcosmarxm

@marcosmarxm

Currently, we are using a patched SnowflakeCache as shown below. I'm ready to create a PR based on this snippet if it looks good to you.

from typing import ClassVar

from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor
from airbyte.caches.base import CacheBase
from airbyte.constants import DEBUG_MODE
from airbyte.destinations._translate_cache_to_dest import snowflake_cache_to_destination_configuration
from airbyte.secrets.base import SecretString
from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase
from airbyte_api.models import DestinationSnowflake
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from overrides import overrides
from snowflake import connector
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine


class PatchedSnowflakeConfig(SnowflakeConfig):
    password: SecretString | None = None
    private_key_file: str | None = None
    private_key_file_pwd: SecretString | None = None

    @overrides
    def get_sql_alchemy_url(self) -> SecretString:
        # original source: https://github.com/airbytehq/PyAirbyte/blob/v0.24.2/airbyte/_processors/sql/snowflake.py#L63
        config = dict(
            account=self.account,
            user=self.username,
            database=self.database,
            warehouse=self.warehouse,
            schema=self.schema_name,
            role=self.role,
        )
        if self.password is not None:
            config["password"] = self.password
        return SecretString(URL(**config))

    # reference: https://github.com/snowflakedb/snowflake-sqlalchemy?tab=readme-ov-file#key-pair-authentication-support
    def _get_private_key_bytes(self, private_key_file: str, private_key_file_pwd: str | None = None) -> bytes:
        with open(private_key_file, "rb") as f:
            private_key = serialization.load_pem_private_key(
                f.read(),
                password=private_key_file_pwd.encode() if private_key_file_pwd is not None else None,
                backend=default_backend(),
            )

        private_key_bytes = private_key.private_bytes(
            encoding=serialization.Encoding.DER,
            format=serialization.PrivateFormat.PKCS8,
            encryption_algorithm=serialization.NoEncryption(),
        )
        return private_key_bytes

    @overrides
    def get_sql_engine(self):
        # original source: https://github.com/airbytehq/PyAirbyte/blob/v0.24.2/airbyte/shared/sql_processor.py#L131
        engine_config = dict(
            url=self.get_sql_alchemy_url(),
            connect_args={},
            echo=DEBUG_MODE,
            execution_options={
                "schema_translate_map": {None: self.schema_name},
            },
            future=True,
        )
        if self.private_key_file is not None:
            engine_config["connect_args"] = {
                "private_key": self._get_private_key_bytes(self.private_key_file, self.private_key_file_pwd)
            }
        return create_engine(**engine_config)

    @overrides
    def get_vendor_client(self) -> object:
        # original source: https://github.com/airbytehq/PyAirbyte/blob/v0.24.2/airbyte/_processors/sql/snowflake.py#L77
        return connector.connect(
            user=self.username,
            password=self.password,
            private_key_file=self.private_key_file,
            private_key_file_pwd=self.private_key_file_pwd,
            account=self.account,
            warehouse=self.warehouse,
            database=self.database,
            schema=self.schema_name,
            role=self.role,
        )


class PatchedSnowflakeCache(PatchedSnowflakeConfig, CacheBase):
    # original source: https://github.com/airbytehq/PyAirbyte/blob/v0.24.2/airbyte/caches/snowflake.py#L36

    dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND

    _sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor

    paired_destination_name: ClassVar[str | None] = "destination-bigquery"
    paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake

    @property
    def paired_destination_config(self) -> DestinationSnowflake:
        """Return a dictionary of destination configuration values."""
        return snowflake_cache_to_destination_configuration(cache=self)

nakamichiworks avatar Apr 11 '25 11:04 nakamichiworks

@nakamichiworks - This is awesome. 💎 Thanks for sharing the code for your implementation!

If you do have time to create as a PR to the repo, I'm sure it would benefit others as well. If not, that's also understandable. We'll keep this issue open as a resource to anyone looking for the same.

Snowflake plans to disable password-only authentication in the near future: https://docs.snowflake.com/en/user-guide/security-mfa-rollout

Especially because of this 👆, I'm sure this will be raised again as the time gets closer.

cc @marcosmarxm

aaronsteers avatar May 30 '25 18:05 aaronsteers

@aaronsteers Thanks for your reply. I created PR #681.

nakamichiworks avatar Jun 01 '25 14:06 nakamichiworks

Resolved in:

  • https://github.com/airbytehq/PyAirbyte/pull/702

🎉

aaronsteers avatar Jun 27 '25 19:06 aaronsteers