Feature Request: Support key-pair authentication for Snowflake cache
Snowflake plans to disable password-only authentication in the near future: https://docs.snowflake.com/en/user-guide/security-mfa-rollout
PyAirbyte currently supports only password authentication, but it should support more secure and allowed authentication methods such as key-pair auth.
@nakamichiworks, would you like to try implementing this? I would be happy to provide you with some guidance.
@marcosmarxm
Currently, we are using a patched SnowflakeCache as shown below.
I'm ready to create a PR based on this snippet if it looks good to you.
from typing import ClassVar
from airbyte._processors.sql.snowflake import SnowflakeConfig, SnowflakeSqlProcessor
from airbyte.caches.base import CacheBase
from airbyte.constants import DEBUG_MODE
from airbyte.destinations._translate_cache_to_dest import snowflake_cache_to_destination_configuration
from airbyte.secrets.base import SecretString
from airbyte.shared.sql_processor import RecordDedupeMode, SqlProcessorBase
from airbyte_api.models import DestinationSnowflake
from cryptography.hazmat.backends import default_backend
from cryptography.hazmat.primitives import serialization
from overrides import overrides
from snowflake import connector
from snowflake.sqlalchemy import URL
from sqlalchemy import create_engine
class PatchedSnowflakeConfig(SnowflakeConfig):
password: SecretString | None = None
private_key_file: str | None = None
private_key_file_pwd: SecretString | None = None
@overrides
def get_sql_alchemy_url(self) -> SecretString:
# original source: https://github.com/airbytehq/PyAirbyte/blob/v0.24.2/airbyte/_processors/sql/snowflake.py#L63
config = dict(
account=self.account,
user=self.username,
database=self.database,
warehouse=self.warehouse,
schema=self.schema_name,
role=self.role,
)
if self.password is not None:
config["password"] = self.password
return SecretString(URL(**config))
# reference: https://github.com/snowflakedb/snowflake-sqlalchemy?tab=readme-ov-file#key-pair-authentication-support
def _get_private_key_bytes(self, private_key_file: str, private_key_file_pwd: str | None = None) -> bytes:
with open(private_key_file, "rb") as f:
private_key = serialization.load_pem_private_key(
f.read(),
password=private_key_file_pwd.encode() if private_key_file_pwd is not None else None,
backend=default_backend(),
)
private_key_bytes = private_key.private_bytes(
encoding=serialization.Encoding.DER,
format=serialization.PrivateFormat.PKCS8,
encryption_algorithm=serialization.NoEncryption(),
)
return private_key_bytes
@overrides
def get_sql_engine(self):
# original source: https://github.com/airbytehq/PyAirbyte/blob/v0.24.2/airbyte/shared/sql_processor.py#L131
engine_config = dict(
url=self.get_sql_alchemy_url(),
connect_args={},
echo=DEBUG_MODE,
execution_options={
"schema_translate_map": {None: self.schema_name},
},
future=True,
)
if self.private_key_file is not None:
engine_config["connect_args"] = {
"private_key": self._get_private_key_bytes(self.private_key_file, self.private_key_file_pwd)
}
return create_engine(**engine_config)
@overrides
def get_vendor_client(self) -> object:
# original source: https://github.com/airbytehq/PyAirbyte/blob/v0.24.2/airbyte/_processors/sql/snowflake.py#L77
return connector.connect(
user=self.username,
password=self.password,
private_key_file=self.private_key_file,
private_key_file_pwd=self.private_key_file_pwd,
account=self.account,
warehouse=self.warehouse,
database=self.database,
schema=self.schema_name,
role=self.role,
)
class PatchedSnowflakeCache(PatchedSnowflakeConfig, CacheBase):
# original source: https://github.com/airbytehq/PyAirbyte/blob/v0.24.2/airbyte/caches/snowflake.py#L36
dedupe_mode: RecordDedupeMode = RecordDedupeMode.APPEND
_sql_processor_class: ClassVar[type[SqlProcessorBase]] = SnowflakeSqlProcessor
paired_destination_name: ClassVar[str | None] = "destination-bigquery"
paired_destination_config_class: ClassVar[type | None] = DestinationSnowflake
@property
def paired_destination_config(self) -> DestinationSnowflake:
"""Return a dictionary of destination configuration values."""
return snowflake_cache_to_destination_configuration(cache=self)
@nakamichiworks - This is awesome. 💎 Thanks for sharing the code for your implementation!
If you do have time to create as a PR to the repo, I'm sure it would benefit others as well. If not, that's also understandable. We'll keep this issue open as a resource to anyone looking for the same.
Snowflake plans to disable password-only authentication in the near future: https://docs.snowflake.com/en/user-guide/security-mfa-rollout
Especially because of this 👆, I'm sure this will be raised again as the time gets closer.
cc @marcosmarxm
@aaronsteers Thanks for your reply. I created PR #681.
Resolved in:
- https://github.com/airbytehq/PyAirbyte/pull/702
🎉