spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-35242][SQL] Support changing session catalog's default database

Open roczei opened this issue 2 years ago • 12 comments

What changes were proposed in this pull request?

This PR is a follow-up PR for #32364. It has been closed by github-actions because it hasn't been updated in a while. The previous PR has added a new custom parameter (spark.sql.catalog.$SESSION_CATALOG_NAME.defaultDatabase) to configure the session catalog's default database which is required by some use cases where the user does not have access to the default database.

Therefore I have created a new PR based on this and added these changes in addition:

  • Rebased / updated the previous PR to the latest master branch version
  • Deleted the DEFAULT_DATABASE static member from sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala and refactored the code regarding this

Why are the changes needed?

If our user does not have any permissions for the Hive default database in Ranger, it will fail with the following error:

22/08/26 18:36:21 INFO  metastore.RetryingMetaStoreClient: [main]: RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient [email protected] (auth:KERBEROS) retries=1 delay=1 lifetime=0
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Permission denied: user [hrt_10] does not have [USE] privilege on [default])
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
  at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
  at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
  at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)

The idea is that we introduce a new configuration parameter where we can set a different database name for the default database. Our user has enough permissions for this in Ranger.

For example:

spark-shell --conf spark.sql.catalog.spark_catalog.defaultDatabase=other_db

Does this PR introduce any user-facing change?

There will be a new configuration parameter as I mentioned above but the default value is "default" as it was previously.

How was this patch tested?

  1. With github action (all tests passed)

https://github.com/roczei/spark/actions/runs/2934863118

  1. Manually tested with Ranger + Hive

Scenario a) hrt_10 does not have access to the default database in Hive:

[hrt_10@quasar-thbnqr-2 ~]$ spark-shell
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/26 18:14:18 WARN  conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist
22/08/26 18:14:30 WARN  cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: [dispatcher-event-loop-17]: Attempted to request executors before the AM has registered!


...

scala> spark.sql("use other")
22/08/26 18:18:47 INFO  conf.HiveConf: [main]: Found configuration file file:/etc/hive/conf/hive-site.xml
22/08/26 18:18:48 WARN  conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist
22/08/26 18:18:48 WARN  client.HiveClientImpl: [main]: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = 2188764e-d0dc-41b3-b714-f89b03cb3d6d
22/08/26 18:18:48 INFO  SessionState: [main]: Hive Session ID = 2188764e-d0dc-41b3-b714-f89b03cb3d6d
22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: HMS client filtering is enabled.
22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: Trying to connect to metastore with URI thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083
22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection.
22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: Opened a connection to metastore, current connections: 1
22/08/26 18:18:50 INFO  metastore.HiveMetaStoreClient: [main]: Connected to metastore.
22/08/26 18:18:50 INFO  metastore.RetryingMetaStoreClient: [main]: RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient [email protected] (auth:KERBEROS) retries=1 delay=1 lifetime=0
org.apache.spark.sql.AnalysisException: org.apache.hadoop.hive.ql.metadata.HiveException: MetaException(message:Permission denied: user [hrt_10] does not have [USE] privilege on [default])
  at org.apache.spark.sql.hive.HiveExternalCatalog.withClient(HiveExternalCatalog.scala:110)
  at org.apache.spark.sql.hive.HiveExternalCatalog.databaseExists(HiveExternalCatalog.scala:223)
  at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:150)
  at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:144)
  at org.apache.spark.sql.internal.SharedState.globalTempViewManager$lzycompute(SharedState.scala:179)

This is the expected behavior because it will use the "default" db name.

Scenario b) Use the "other" database where the hrt_10 user has proper permissions

[hrt_10@quasar-thbnqr-2 ~]$ spark-shell --conf spark.sql.catalog.spark_catalog.defaultDatabase=other
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/08/26 18:27:03 WARN  conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist
22/08/26 18:27:14 WARN  cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: [dispatcher-event-loop-15]: Attempted to request executors before the AM has registered!

...

scala> spark.sql("use other")
22/08/26 18:29:22 INFO  conf.HiveConf: [main]: Found configuration file file:/etc/hive/conf/hive-site.xml
22/08/26 18:29:22 WARN  conf.HiveConf: [main]: HiveConf of name hive.masking.algo does not exist
22/08/26 18:29:22 WARN  client.HiveClientImpl: [main]: Detected HiveConf hive.execution.engine is 'tez' and will be reset to 'mr' to disable useless hive logic
Hive Session ID = 47721693-dbfe-4760-80f6-d4a76a3b37d2
22/08/26 18:29:22 INFO  SessionState: [main]: Hive Session ID = 47721693-dbfe-4760-80f6-d4a76a3b37d2
22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: HMS client filtering is enabled.
22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: Trying to connect to metastore with URI thrift://quasar-thbnqr-4.quasar-thbnqr.root.hwx.site:9083
22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: HMSC::open(): Could not find delegation token. Creating KERBEROS-based thrift connection.
22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: Opened a connection to metastore, current connections: 1
22/08/26 18:29:24 INFO  metastore.HiveMetaStoreClient: [main]: Connected to metastore.
22/08/26 18:29:24 INFO  metastore.RetryingMetaStoreClient: [main]: RetryingMetaStoreClient proxy=class org.apache.hadoop.hive.ql.metadata.SessionHiveMetaStoreClient [email protected] (auth:KERBEROS) retries=1 delay=1 lifetime=0
res0: org.apache.spark.sql.DataFrame = []

scala> spark.sql("select * from employee").show()
+---+----+------+-----------+                                                   
|eid|name|salary|destination|
+---+----+------+-----------+
| 12| Ram|    10|     Szeged|
| 13| Joe|    20|   Debrecen|
+---+----+------+-----------+


scala>

roczei avatar Aug 26 '22 19:08 roczei

@cloud-fan, @hddong, @dongjoon-hyun could you please review my changes? Thanks!

roczei avatar Aug 26 '22 19:08 roczei

Can one of the admins verify this patch?

AmplabJenkins avatar Aug 27 '22 14:08 AmplabJenkins

@cloud-fan

Could you please take a look when you have some time? This PR is a follow-up PR for https://github.com/apache/spark/pull/32364. Thanks!

roczei avatar Sep 06 '22 15:09 roczei

we should also update V2SessionCatalog.defaultNamespace

cloud-fan avatar Sep 07 '22 12:09 cloud-fan

Thanks @cloud-fan, I will check these!

roczei avatar Sep 08 '22 11:09 roczei

we should also update V2SessionCatalog.defaultNamespace

@cloud-fan, did you mean this change?

diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
index efbc9dd755..23775c3ae0 100644
--- a/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
+++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/v2/V2SessionCatalog.scala
@@ -33,6 +33,7 @@ import org.apache.spark.sql.connector.expressions.Transform
 import org.apache.spark.sql.errors.{QueryCompilationErrors, QueryExecutionErrors}
 import org.apache.spark.sql.execution.datasources.DataSource
 import org.apache.spark.sql.internal.connector.V1Function
+import org.apache.spark.sql.internal.SQLConf
 import org.apache.spark.sql.types.StructType
 import org.apache.spark.sql.util.CaseInsensitiveStringMap
 
@@ -43,7 +44,7 @@ class V2SessionCatalog(catalog: SessionCatalog)
   extends TableCatalog with FunctionCatalog with SupportsNamespaces with SQLConfHelper {
   import V2SessionCatalog._
 
-  override val defaultNamespace: Array[String] = Array("default")
+  override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase)

roczei avatar Sep 09 '22 20:09 roczei

override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase)

Yes

cloud-fan avatar Sep 16 '22 14:09 cloud-fan

override val defaultNamespace: Array[String] = Array(SQLConf.get.defaultDatabase)

Yes

@cloud-fan Ok, I have fixed this.

roczei avatar Sep 18 '22 19:09 roczei

Is this a common behavior in other databases?

@cloud-fan Good question. The reason that we cannot delete the user specified default database because we have the following if statement in the actual code:

if (dbName == defaultDatabase)

and this is the latest state of master:

https://github.com/apache/spark/blob/master/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala#L286

  def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
    val dbName = format(db)
    if (dbName == DEFAULT_DATABASE) {
      throw QueryCompilationErrors.cannotDropDefaultDatabaseError
    }

As you can see that I am just using the same logic.

If you think that we should only deny the database drop for "default" and allow for the value of spark.sql.catalog.spark_catalog.defaultDatabase, it is ok for me. The change is very simple:

+++ b/sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/catalog/SessionCatalog.scala
@@ -284,7 +284,7 @@ class SessionCatalog(
 
   def dropDatabase(db: String, ignoreIfNotExists: Boolean, cascade: Boolean): Unit = {
     val dbName = format(db)
-    if (dbName == defaultDatabase) {
+    if (dbName == "default") {
       throw QueryCompilationErrors.cannotDropDefaultDatabaseError
     }
     if (!ignoreIfNotExists) {

and here is the validation:

 $ ./spark-shell --conf spark.sql.catalogImplementation=hive --conf spark.sql.catalog.spark_catalog.defaultDatabase=xyz
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/19 14:21:07 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1663590068068).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.0-SNAPSHOT
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_345)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|      abc|
|  default|
|      xyz|
+---------+


scala> spark.sql("use database abc")
res1: org.apache.spark.sql.DataFrame = []

scala> spark.sql("SELECT current_database() AS db").show()
+---+
| db|
+---+
|abc|
+---+


scala> 

scala> spark.sql("drop database xyz")
res3: org.apache.spark.sql.DataFrame = []

scala> spark.sql("show databases").show()
+---------+
|namespace|
+---------+
|      abc|
|  default|
+---------+


scala> 

Is this solution acceptable for you?

roczei avatar Sep 19 '22 12:09 roczei

SGTM!

cloud-fan avatar Sep 19 '22 12:09 cloud-fan

Thanks @cloud-fan, I have implemented this and all tests passed. As I see we have resolved all of your feedbacks.

roczei avatar Sep 19 '22 18:09 roczei

Hi @cloud-fan,

Thanks for the feedbacks, I have resolved all of them. Unfortunatelly I did a mistake during the version magament but I have already resolved with git rebase / git push as you can see. This is the reason that a lot of new labels have been added which are not necessary by this pull request. Only the SQL label is the good one. Unfortauntelly I cannot delete the other labels.

roczei avatar Sep 21 '22 15:09 roczei

Hi @cloud-fan,

All build issues have been fixed and all of your feedbacks have been implemented. Latest state:

$ bin/spark-shell --conf spark.sql.catalog.spark_catalog.defaultDatabase=other_db
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
22/09/25 14:58:15 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Spark context Web UI available at http://localhost:4040
Spark context available as 'sc' (master = local[*], app id = local-1664110696143).
Spark session available as 'spark'.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 3.4.0-SNAPSHOT
      /_/
         
Using Scala version 2.12.17 (OpenJDK 64-Bit Server VM, Java 1.8.0_345)
Type in expressions to have them evaluated.
Type :help for more information.

scala> spark.sql("show databases").show()
org.apache.spark.SparkDefaultDatabaseNotExistsException: [DEFAULT_DATABASE_NOT_EXISTS] Default database other_db does not exist, please create it first or change default database to 'default'.
  at org.apache.spark.sql.errors.QueryExecutionErrors$.defaultDatabaseNotExistsError(QueryExecutionErrors.scala:1936)
  at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:156)
  at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:147)
  at org.apache.spark.sql.internal.BaseSessionStateBuilder.$anonfun$catalog$1(BaseSessionStateBuilder.scala:154)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog$lzycompute(SessionCatalog.scala:122)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.externalCatalog(SessionCatalog.scala:122)
  at org.apache.spark.sql.catalyst.catalog.SessionCatalog.listDatabases(SessionCatalog.scala:323)

roczei avatar Sep 25 '22 12:09 roczei

LGTM except for 2 minor comments

Thanks @cloud-fan! I have implemented these two comments.

roczei avatar Sep 26 '22 09:09 roczei

thanks, meriging to master!

cloud-fan avatar Sep 27 '22 00:09 cloud-fan

@cloud-fan,

Thank you very much for your help!

roczei avatar Sep 27 '22 06:09 roczei