spark
spark copied to clipboard
[SPARK-35242][SQL] Support changing session catalog's default database
What changes were proposed in this pull request?
This PR is a follow-up PR for #32364. It has been closed by github-actions because it hasn't been updated in a while. The previous PR has added a new custom parameter (spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase) to configure the session catalog's default database which is required by some use cases where the user does not have access to the default database.
Therefore I have created a new PR based on this and added these changes in addition:
- Rebased / updated the previous PR to the latest master branch version
- Deleted the DEFAULT_DATABASE static member from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala and refactored the code regarding this
Why are the changes needed?
If our user does not have any permissions for the Hive default database in Ranger, it will fail with the following error:
22/08/26 18:36:21 INFO metastore.RetryingMetaStoreClient: [main]: RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient [email protected] (auth:KERBEROS) retries=1 delay=1 lifetime=0
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Permission denied: user [hrt_10] does not have [USE] privilege on [default])
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
The idea is that we introduce a new configuration parameter where we can set a different database name for the default database. Our user has enough permissions for this in Ranger.
For example:
spark-shell --conf spark.sql.catalog.spark_catalog.defaultDatabase=other_db
Does this PR introduce any user-facing change?
There will be a new configuration parameter as I mentioned above but the default value is "default" as it was previously.
How was this patch tested?
- With github action (all tests passed)
https://github.com/roczei/spark/actions/runs/2934863118
- Manually tested with Ranger + Hive
Scenario a) hrt_10 does not have access to the default database in Hive:
[hrt_10@quasar-thbnqr-2 ~]$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/26 18:14:18 WARN conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist
22/08/26 18:14:30 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: [dispatcher-event-loop-17]: Attempted to request executors before the AM has registered!
...
scala> spark.sql("use other")
22/08/26 18:18:47 INFO conf.HiveConf: [main]: Found configuration file file:/etc/hive/conf/hive-site.xml
22/08/26 18:18:48 WARN conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist
22/08/26 18:18:48 WARN client.HiveClientImpl: [main]: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = 2188764e-d0dc-41b3-b714-f89b03cb3d6d
22/08/26 18:18:48 INFO SessionState: [main]: Hive Session ID = 2188764e-d0dc-41b3-b714-f89b03cb3d6d
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: HMS client filtering is enabled.
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: Trying to connect to metastore with URI thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection.
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: Opened a connection to metastore, current connections: 1
22/08/26 18:18:50 INFO metastore.HiveMetaStoreClient: [main]: Connected to metastore.
22/08/26 18:18:50 INFO metastore.RetryingMetaStoreClient: [main]: RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient [email protected] (auth:KERBEROS) retries=1 delay=1 lifetime=0
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Permission denied: user [hrt_10] does not have [USE] privilege on [default])
at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:179)
This is the expected behavior because it will use the "default" db name.
Scenario b) Use the "other" database where the hrt_10 user has proper permissions
[hrt_10@quasar-thbnqr-2 ~]$ spark-shell --conf spark.sql.catalog.spark_catalog.defaultDatabase=other
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/26 18:27:03 WARN conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist
22/08/26 18:27:14 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: [dispatcher-event-loop-15]: Attempted to request executors before the AM has registered!
...
scala> spark.sql("use other")
22/08/26 18:29:22 INFO conf.HiveConf: [main]: Found configuration file file:/etc/hive/conf/hive-site.xml
22/08/26 18:29:22 WARN conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist
22/08/26 18:29:22 WARN client.HiveClientImpl: [main]: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = 47721693-dbfe-4760-80f6-d4a76a3b37d2
22/08/26 18:29:22 INFO SessionState: [main]: Hive Session ID = 47721693-dbfe-4760-80f6-d4a76a3b37d2
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: HMS client filtering is enabled.
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: Trying to connect to metastore with URI thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection.
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: Opened a connection to metastore, current connections: 1
22/08/26 18:29:24 INFO metastore.HiveMetaStoreClient: [main]: Connected to metastore.
22/08/26 18:29:24 INFO metastore.RetryingMetaStoreClient: [main]: RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient [email protected] (auth:KERBEROS) retries=1 delay=1 lifetime=0
res0: org.apache.spark.sql.DataFrame = []
scala> spark.sql("select * from employee").show()
+---+----+------+-----------+
|eid|name|salary|destination|
+---+----+------+-----------+
| 12| Ram| 10| Szeged|
| 13| Joe| 20| Debrecen|
+---+----+------+-----------+
scala>
@cloud-fan, @hddong, @dongjoon-hyun could you please review my changes? Thanks!
Can one of the admins verify this patch?
@cloud-fan
Could you please take a look when you have some time? This PR is a follow-up PR for https://github.com/apache/spark/pull/32364. Thanks!
we should also update V2SessionCatalog.defaultNamespace
Thanks @cloud-fan, I will check these!
we should also update
V2SessionCatalog.defaultNamespace
@cloud-fan, did you mean this change?
diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index efbc9dd755..23775c3ae0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.connector.expressions.Transform
import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
import org.apache.spark.sql.execution.datasources.DataSource
import org.apache.spark.sql.internal.connector.V1Function
+import org.apache.spark.sql.internal.SQLConf
import org.apache.spark.sql.types.StructType
import org.apache.spark.sql.util.CaseInsensitiveStringMap
@@ -43,7 +44,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper {
import V2SessionCatalog._
- override val defaultNamespace: Array[String] = Array("default")
+ override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase)
override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase)
Yes
override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase)
Yes
@cloud-fan Ok, I have fixed this.
Is this a common behavior in other databases?
@cloud-fan Good question. The reason that we cannot delete the user specified default database because we have the following if statement in the actual code:
if (dbName == defaultDatabase)
and this is the latest state of master:
https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala#L286
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = format(db)
if (dbName == DEFAULT_DATABASE) {
throw QueryCompilationErrors.cannotDropDefaultDatabaseError
}
As you can see that I am just using the same logic.
If you think that we should only deny the database drop for "default" and allow for the value of spark.sql.catalog.spark_catalog.defaultDatabase, it is ok for me. The change is very simple:
+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -284,7 +284,7 @@ class SessionCatalog(
def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
val dbName = format(db)
- if (dbName == defaultDatabase) {
+ if (dbName == "default") {
throw QueryCompilationErrors.cannotDropDefaultDatabaseError
}
if (!ignoreIfNotExists) {
and here is the validation:
$ ./spark-shell --conf spark.sql.catalogImplementation=hive --conf spark.sql.catalog.spark_catalog.defaultDatabase=xyz
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/19 14:21:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1663590068068).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.4.0-SNAPSHOT
/_/
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_345)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
| abc|
| default|
| xyz|
+---------+
scala> spark.sql("use database abc")
res1: org.apache.spark.sql.DataFrame = []
scala> spark.sql("SELECT current_database() AS db").show()
+---+
| db|
+---+
|abc|
+---+
scala>
scala> spark.sql("drop database xyz")
res3: org.apache.spark.sql.DataFrame = []
scala> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
| abc|
| default|
+---------+
scala>
Is this solution acceptable for you?
SGTM!
Thanks @cloud-fan, I have implemented this and all tests passed. As I see we have resolved all of your feedbacks.
Hi @cloud-fan,
Thanks for the feedbacks, I have resolved all of them. Unfortunatelly I did a mistake during the version magament but I have already resolved with git rebase / git push as you can see. This is the reason that a lot of new labels have been added which are not necessary by this pull request. Only the SQL label is the good one. Unfortauntelly I cannot delete the other labels.
Hi @cloud-fan,
All build issues have been fixed and all of your feedbacks have been implemented. Latest state:
$ bin/spark-shell --conf spark.sql.catalog.spark_catalog.defaultDatabase=other_db
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/25 14:58:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1664110696143).
Spark session available as 'spark'.
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 3.4.0-SNAPSHOT
/_/
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_345)
Type in expressions to have them evaluated.
Type :help for more information.
scala> spark.sql("show databases").show()
org.apache.spark.SparkDefaultDatabaseNotExistsException: [DEFAULT_DATABASE_NOT_EXISTS] Default database other_db does not exist, please create it first or change default database to 'default'.
at org.apache.spark.sql.errors.QueryExecutionErrors$.defaultDatabaseNotExistsError(QueryExecutionErrors.scala:1936)
at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:156)
at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:147)
at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$catalog$1(BaseSessionStateBuilder.scala:154)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:122)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:122)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listDatabases(SessionCatalog.scala:323)
LGTM except for 2 minor comments
Thanks @cloud-fan! I have implemented these two comments.
thanks, meriging to master!
@cloud-fan,
Thank you very much for your help!