soda-core
soda-core copied to clipboard
Enable more authentication options for Databricks data source
Soda core uses databricks.sql.connect for authentication, which offer many options, as documented:
Unfortunately, the way this is implemented by soda.data_sources.spark_data_source.databricks_connection_function limits it to personal access tokens:
https://github.com/sodadata/soda-core/blob/09262b0703ee9473240dcc0820bf213263c4d11c/soda/spark/soda/data_sources/spark_data_source.py#L136-L149
Likewise in SparkDataSource: https://github.com/sodadata/soda-core/blob/09262b0703ee9473240dcc0820bf213263c4d11c/soda/spark/soda/data_sources/spark_data_source.py#L474-L491
A solution could be to extend the signature of databricks_connection_function to match databricks.sql.connect, for example:
def databricks_connection_function(
host: str,
http_path: str,
database: str,
schema: str,
auth_type: Literal["databricks-oauth"] | None = None,
token: str | None = None,
username: str | None = None,
password: str | None = None,
client_id: str | None = None,
client_secret: str | None = None,
):
...
These could then be sent trough to databricks.sql.connect (with the exception of client_id and client_secret which require the creation of a credentials provider if defined).
Adding these options (in particular OAuth) would allow much more secure and robust connection alternatives!
SAS-3512
Hi, thank you for creating the ticket! I will add the request to our backlog and prioritize accordingly. If you have time, feel free to contribute, it would be greatly appreciated! https://github.com/sodadata/soda-core/blob/main/CONTRIBUTING.md.
I've tried to resolve this here:
https://github.com/sodadata/soda-core/pull/2220
Hey - I'm not sure how long the above will take so I've made a method to inject custom connection types in. It's possible to do as the code grabs the datasource connection type using importlib
Below is an example for databricks oauth - This only persists for the instance of the python process. You'll have to do it before you run any soda code.
import importlib
import sys
from soda.data_sources.spark_data_source import SparkDataSource
custom_module_name = "soda.data_sources.dbxoauth_data_source"
custom_module = importlib.util.module_from_spec(importlib.util.spec_from_loader(custom_module_name, loader=None))
def databricks_oauth_connection_function(host: str, http_path: str, token: str, database: str, schema: str, **kwargs):
"""
Connection to databricks with databricks sql connector.
Supplying a token will enforce connection via personal access token.
host, client_id and client_secret keys can be supplied to the configuration parameter for m2m oauth.
Setting oauth_method to "databricks-oauth" will enforce a u2m oauth connection.
Read the python-sql-connector documentation for more information.
Parameters
----------
host : str
The databricks server host name.
http_path: str
The http_path to your databricks sql warehouse or cluster
token: str
Databricks personal access token
database: str
The databricks catalog
schema : str
The databricks schema
Returns
-------
out : databricks.sql.Connection
The databricks connection object
"""
from databricks import sql
user_agent_entry = f"soda-core-spark/{SODA_CORE_VERSION} (Databricks)"
logging.getLogger("databricks.sql").setLevel(logging.INFO)
auth_method = kwargs.get("auth_method")
if not token and not auth_method:
from databricks.sdk.core import Config, oauth_service_principal
config = Config(
**kwargs.get("configuration", {})
)
if not host:
host = config.hostname
def credential_provider():
return oauth_service_principal(config)
credentials_provider = credential_provider
else:
credentials_provider = None
connection = sql.connect(
server_hostname=host,
catalog=database,
schema=schema,
http_path=http_path,
access_token=token,
credentials_provider=credentials_provider,
auth_type=kwargs.get("auth_method"),
_user_agent_entry=user_agent_entry,
)
return connection
class DBXOauthDataSource(SparkDataSource):
def connect(self):
connection_function = databricks_oauth_connection_function
try:
connection = connection_function(
username=self.username,
password=self.password,
host=self.host,
port=self.port,
database=self.database,
auth_method=self.auth_method,
kerberos_service_name=self.kerberos_service_name,
driver=self.driver,
token=self.token,
schema=self.schema,
http_path=self.http_path,
organization=self.organization,
cluster=self.cluster,
server_side_parameters=self.server_side_parameters,
configuration=self.configuration,
scheme=self.scheme,
)
self.connection = connection
except Exception as e:
raise DataSourceConnectionError(self.type, e)
setattr(custom_module, "DBXOauthDataSource",DBXOauthDataSource)
sys.modules[custom_module_name] = custom_module