[BUG][Spark] Exception while using 3-layer-namespace on Spark
Bug
Which Delta project/connector is this regarding?
- [x] Spark
- [ ] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)
Describe the problem
As of Spark 3.4.0, native support for 3-layer-namespaces for tables was added into SQL API, allowing multiple catalogs to be accessed through using a full table name of the <catalog>.<schema>.<table> convention. Multiple catalogs can be set using the spark.sql.catalog.<catalog_name>=... spark config.
This works when using the Apache Iceberg example below, but does not work when utilizing multiple Delta catalogs. While the SparkSession is initiated with the catalog present in the session, when a second, non spark_catalog catalog is referenced, the following exception is thrown.
[INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
Here is a recent StackOverflow post experiencing the same issue with PySpark: https://stackoverflow.com/questions/77751057/multiple-catalogs-in-spark
Steps to reproduce
I am running this on my local machine, in client mode, using my local filesystem to host data.
Here is my SparkSession generator:
override lazy val spark = SparkSession.builder()
.master("local")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.config("spark.sql.catalog.catalog_b", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.enableHiveSupport()
.getOrCreate()
Here is a batch of testing code:
import spark.implicits._
val df = Seq(
("A", 1, 2.3),
("B", 4, 5.6)
).toDF("X", "Y", "Z")
spark.sql("create schema here")
df.write.mode(SaveMode.Overwrite).saveAsTable("here.now")
val df2 = df.union(Seq(("C", 7, 8.9)).toDF("X", "Y", "Z"))
df2.write.mode(SaveMode.Overwrite).saveAsTable("spark_catalog.here.now2")
spark.catalog.setCurrentCatalog("catalog_b")
spark.sql("create schema here")
val df3 = df2.union(Seq(("D", 10, 11.12)).toDF("X", "Y", "Z"))
df3.write.mode(SaveMode.Overwrite).saveAsTable("catalog_b.here.now")
Observed results
When running with the example above, the spark.catalog.setCurrentCatalog("catalog_b") command works but then the following spark.sql("create schema here") command throws the exception below:
[INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:536)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:548)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
at com.hungryroot.ThreeLayerDeltaTest.$anonfun$new$2(ThreeLayerNamespaceTest.scala:94)
at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
at org.scalatest.Transformer.apply(Transformer.scala:22)
at org.scalatest.Transformer.apply(Transformer.scala:20)
at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1832)
at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1686)
at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1830)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1842)
at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1842)
at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1824)
at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1686)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1900)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1900)
at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1899)
at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1686)
at org.scalatest.Suite.run(Suite.scala:1114)
at org.scalatest.Suite.run$(Suite.scala:1096)
at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1686)
at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1945)
at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1945)
at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1943)
at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1686)
at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
at org.scalatest.tools.Runner$.run(Runner.scala:798)
at org.scalatest.tools.Runner.run(Runner.scala)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
Caused by: java.lang.NullPointerException
at org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension.name(DelegatingCatalogExtension.java:50)
at org.apache.spark.sql.connector.catalog.CatalogV2Util$.isSessionCatalog(CatalogV2Util.scala:374)
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$DatabaseNameInSessionCatalog$.unapply(ResolveSessionCatalog.scala:628)
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:231)
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:52)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:32)
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:52)
at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:46)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
at scala.collection.immutable.List.foldLeft(List.scala:91)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
at scala.collection.immutable.List.foreach(List.scala:431)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
... 65 more
Expected results
I would expect this to create a schema here in the catalog catalog_b and allow me to save data to it.
Further details
This is an effort to create a local "delta lake" for testing which can be compatible with Databrick's three layer namepace used by their Unity Catalog.
Environment information
- Delta Lake version: 3.0.0
- Spark version: 3.4.1
- Scala version: 2.12
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [x] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [ ] No. I cannot contribute a bug fix at this time.
+1 this makes it hard to create tests for code that uses multiple catalogs in production
+1
+1 Any workaround to be able to test 3-layer-namespace?
Hi! We're encountering the same problem. Have you made any progress or found a workaround to resolve this issue?
Any update or does anyone have a workaround for this ?
+1
Just confirming I am still being blocked by this. We have a "workaround" where we squish the catalog and database names together when running locally but its not pretty...
I am also facing this issue
I'd also like to see this address for unit tests
I've confirmed that (1) using Delta catalog and an Iceberg catalog works ✅ (2) using two Iceberg catalogs works ✅ (3) using two Delta catalogs fails ❌
WIP investigating why
+1 i'm facing the same problem while using 2 delta catalogs
+1
+1
+1
+1
is there any feedback on this? seems like iceberg is the only way to do this?
+1
+1 in pyspark have the same issue in test below and at the end it will thrown with Parsing exception like
[PARSE_SYNTAX_ERROR] Syntax error at or near '.'.(line 1, pos 20)
== SQL == spark_catalog.source.source_table_join --------------------^^^ and the same does not work for DeltaTable.createOrReplace if I use full qualified name catalog.schema.table `
@pytest.fixture(scope="session")
def spark_session():
shutil.rmtree("spark-warehouse", ignore_errors=True)
os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
builder = (SparkSession.builder
.master("local[*]")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.databricks.delta.schema.autoMerge.enabled", "true")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.appName("test")
)
yield configure_spark_with_delta_pip(builder).getOrCreate()
shutil.rmtree("spark-warehouse", ignore_errors=True)
@pytest.mark.usefixtures("spark_session")
def test_join_operation_with_catalog(self, spark_session: SparkSession):
source_schema = StructType([
StructField("id", StringType(), True),
StructField("derived_column", StringType(), True),
StructField("filter_column", StringType(), True)
])
spark_session.sql("CREATE SCHEMA source")
spark_session.sql("DROP TABLE IF EXISTS spark_catalog.source.source_table_join")
spark_session.catalog.setCurrentCatalog("spark_catalog")
spark_session.catalog.setCurrentDatabase("source")
DeltaTable.createOrReplace(spark_session).tableName("source_table_join").addColumns(
source_schema).execute()
try:
print(DeltaTable.forName(spark_session, "source.source_table_join").toDF().collect())
print('SUCCESS')
except Exception as err:
print("FAILURE")
print(err)`
@scottsand-db I've tested this with Spark 4.0-preview2 and Delta Lake 4.0 preview, same issue. This should be fixed before shipping Delta for 4.0 at least.
Has there been any fix for this, I am still observing the issue when writing tests.
Are there any news on that? It's super important to have this aligned. There seems to be nor progress on that for over a year! I'm seriously thinking of fully switching to Iceberg as it seems to emerge as the only viable option now.
@sadowskik apologies for the delay in getting to this task, we're looking into it. @rosspalmer would you be open to submitting a fix for this? We're happy to loop in appropriate Delta experts to help guide you as well.
@raveeram-db I'll have to refresh myself on the specifics but do have some upcoming available time. I'll follow up next week.
Thanks @rosspalmer, appreciate it! @LukasRupprecht is an expert in this area and should be able to help out with any questions
@LukasRupprecht Would it be possible to set up the situation in the OP as test suite for spark.org.apache.spark.sql.delta.catalog.DeltaCatalog? Are there any base classes that would be helpful in setting up the test?
@rosspalmer You could add the test case to CustomCatalogSuite. That suite already has several test case for testing a different catalog implementations so should provide some examples. We can add a test that has two DeltaCatalogs with different names configured.
@LukasRupprecht I've got the CustomCatalogSuite working on my machine and am working on recreating the issue in there. I have a couple questions on communications:
-
I am able to run the suite via
sbtcommands but am still having trouble importing the project in Intellji, would a specific slack channel be the best place troubleshoot my setup? -
For questions on this bug, should I continue discussions on this GitHub thread or would you want me to slack you directly?
-
How do I get setup to push branches for your review?
figured out #1 ^^ All setup now in Intellij and have added the failing test. I am working on debugging the issue now
Thanks for working on this @rosspalmer !
I've identified the issue but unsure of the best avenue to attempt a fix. @LukasRupprecht Could you advise on the situation below?
Background:
The DeltaCatalog is an extension of Spark's DelegatingCatalogExtension which allows for overriding specific methods of a base CatalogPlugin. Any methods that are not overwritten get called on the delegate base catalog.
For spark.sql.catalog.spark_catalog, the default Spark V2SessionCatalog is set as the delegate automatically the first time this catalog is utilized.
For catalogs not named spark_catalog, this call to set a delegate doesn't happen naturally and thus when catalog_b is called in the example above, the .name() method is passed to the delegate which is null and throws the NullPointerException.
Issue:
The DelegatingCatalogExtension class sets the initialize method as final which prevents us from passing the catalog name (catalog_b in my example) to the class when it's being generated (Catalogs.load() method). From my initial review, I don't know of another way to pass the name after the catalog is initially loaded
Question:
What would be the best way to tackle this?
- Looking at the Iceberg catalog example, they don't use the
DelegatingCatalogExtensionand pull thename, plus other configs, in theinitializecall. Is this an option? - Theoretically we should be able to set a
delegatelike in the defaultspark_catalogbut I don't know how to do this while getting the secondary name. Would it be usefully to create a class likeDummyCatalogin the test suite that would wrap an internalDeltaCatalog?