delta icon indicating copy to clipboard operation
delta copied to clipboard

[BUG][Spark] Exception while using 3-layer-namespace on Spark

Open rosspalmer opened this issue 1 year ago • 45 comments

Bug

Which Delta project/connector is this regarding?

  • [x] Spark
  • [ ] Standalone
  • [ ] Flink
  • [ ] Kernel
  • [ ] Other (fill in here)

Describe the problem

As of Spark 3.4.0, native support for 3-layer-namespaces for tables was added into SQL API, allowing multiple catalogs to be accessed through using a full table name of the <catalog>.<schema>.<table> convention. Multiple catalogs can be set using the spark.sql.catalog.<catalog_name>=... spark config.

This works when using the Apache Iceberg example below, but does not work when utilizing multiple Delta catalogs. While the SparkSession is initiated with the catalog present in the session, when a second, non spark_catalog catalog is referenced, the following exception is thrown.

[INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.

Here is a recent StackOverflow post experiencing the same issue with PySpark: https://stackoverflow.com/questions/77751057/multiple-catalogs-in-spark

Steps to reproduce

I am running this on my local machine, in client mode, using my local filesystem to host data.

Here is my SparkSession generator:

override lazy val spark = SparkSession.builder()
      .master("local")
      .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
      .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      .config("spark.sql.catalog.catalog_b", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
      .enableHiveSupport()
      .getOrCreate()

Here is a batch of testing code:

import spark.implicits._

val df = Seq(
  ("A", 1, 2.3),
  ("B", 4, 5.6)
).toDF("X", "Y", "Z")

spark.sql("create schema here")

df.write.mode(SaveMode.Overwrite).saveAsTable("here.now")

val df2 = df.union(Seq(("C", 7, 8.9)).toDF("X", "Y", "Z"))
df2.write.mode(SaveMode.Overwrite).saveAsTable("spark_catalog.here.now2")

spark.catalog.setCurrentCatalog("catalog_b")
spark.sql("create schema here")

val df3 = df2.union(Seq(("D", 10, 11.12)).toDF("X", "Y", "Z"))
df3.write.mode(SaveMode.Overwrite).saveAsTable("catalog_b.here.now")

Observed results

When running with the example above, the spark.catalog.setCurrentCatalog("catalog_b") command works but then the following spark.sql("create schema here") command throws the exception below:

[INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
org.apache.spark.SparkException: [INTERNAL_ERROR] The Spark SQL phase analysis failed with an internal error. You hit a bug in Spark or the Spark plugins you use. Please, report this bug to the corresponding communities or vendors, and provide the full stack trace.
	at org.apache.spark.SparkException$.internalError(SparkException.scala:107)
	at org.apache.spark.sql.execution.QueryExecution$.toInternalError(QueryExecution.scala:536)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:548)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$1(QueryExecution.scala:219)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.QueryExecution.executePhase(QueryExecution.scala:218)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:77)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:74)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:66)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:97)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$4(SparkSession.scala:691)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:682)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:713)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:744)
	at com.hungryroot.ThreeLayerDeltaTest.$anonfun$new$2(ThreeLayerNamespaceTest.scala:94)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.scalatest.OutcomeOf.outcomeOf(OutcomeOf.scala:85)
	at org.scalatest.OutcomeOf.outcomeOf$(OutcomeOf.scala:83)
	at org.scalatest.OutcomeOf$.outcomeOf(OutcomeOf.scala:104)
	at org.scalatest.Transformer.apply(Transformer.scala:22)
	at org.scalatest.Transformer.apply(Transformer.scala:20)
	at org.scalatest.flatspec.AnyFlatSpecLike$$anon$5.apply(AnyFlatSpecLike.scala:1832)
	at org.scalatest.TestSuite.withFixture(TestSuite.scala:196)
	at org.scalatest.TestSuite.withFixture$(TestSuite.scala:195)
	at org.scalatest.flatspec.AnyFlatSpec.withFixture(AnyFlatSpec.scala:1686)
	at org.scalatest.flatspec.AnyFlatSpecLike.invokeWithFixture$1(AnyFlatSpecLike.scala:1830)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTest$1(AnyFlatSpecLike.scala:1842)
	at org.scalatest.SuperEngine.runTestImpl(Engine.scala:306)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest(AnyFlatSpecLike.scala:1842)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTest$(AnyFlatSpecLike.scala:1824)
	at org.scalatest.flatspec.AnyFlatSpec.runTest(AnyFlatSpec.scala:1686)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$runTests$1(AnyFlatSpecLike.scala:1900)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:413)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:390)
	at org.scalatest.SuperEngine.$anonfun$runTestsInBranch$1(Engine.scala:427)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.SuperEngine.traverseSubNodes$1(Engine.scala:401)
	at org.scalatest.SuperEngine.runTestsInBranch(Engine.scala:396)
	at org.scalatest.SuperEngine.runTestsImpl(Engine.scala:475)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests(AnyFlatSpecLike.scala:1900)
	at org.scalatest.flatspec.AnyFlatSpecLike.runTests$(AnyFlatSpecLike.scala:1899)
	at org.scalatest.flatspec.AnyFlatSpec.runTests(AnyFlatSpec.scala:1686)
	at org.scalatest.Suite.run(Suite.scala:1114)
	at org.scalatest.Suite.run$(Suite.scala:1096)
	at org.scalatest.flatspec.AnyFlatSpec.org$scalatest$flatspec$AnyFlatSpecLike$$super$run(AnyFlatSpec.scala:1686)
	at org.scalatest.flatspec.AnyFlatSpecLike.$anonfun$run$1(AnyFlatSpecLike.scala:1945)
	at org.scalatest.SuperEngine.runImpl(Engine.scala:535)
	at org.scalatest.flatspec.AnyFlatSpecLike.run(AnyFlatSpecLike.scala:1945)
	at org.scalatest.flatspec.AnyFlatSpecLike.run$(AnyFlatSpecLike.scala:1943)
	at org.scalatest.flatspec.AnyFlatSpec.run(AnyFlatSpec.scala:1686)
	at org.scalatest.tools.SuiteRunner.run(SuiteRunner.scala:47)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13(Runner.scala:1321)
	at org.scalatest.tools.Runner$.$anonfun$doRunRunRunDaDoRunRun$13$adapted(Runner.scala:1315)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.scalatest.tools.Runner$.doRunRunRunDaDoRunRun(Runner.scala:1315)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24(Runner.scala:992)
	at org.scalatest.tools.Runner$.$anonfun$runOptionallyWithPassFailReporter$24$adapted(Runner.scala:970)
	at org.scalatest.tools.Runner$.withClassLoaderAndDispatchReporter(Runner.scala:1481)
	at org.scalatest.tools.Runner$.runOptionallyWithPassFailReporter(Runner.scala:970)
	at org.scalatest.tools.Runner$.run(Runner.scala:798)
	at org.scalatest.tools.Runner.run(Runner.scala)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.runScalaTest2or3(ScalaTestRunner.java:43)
	at org.jetbrains.plugins.scala.testingSupport.scalaTest.ScalaTestRunner.main(ScalaTestRunner.java:26)
Caused by: java.lang.NullPointerException
	at org.apache.spark.sql.connector.catalog.DelegatingCatalogExtension.name(DelegatingCatalogExtension.java:50)
	at org.apache.spark.sql.connector.catalog.CatalogV2Util$.isSessionCatalog(CatalogV2Util.scala:374)
	at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$DatabaseNameInSessionCatalog$.unapply(ResolveSessionCatalog.scala:628)
	at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:231)
	at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog$$anonfun$apply$1.applyOrElse(ResolveSessionCatalog.scala:52)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$3(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.trees.CurrentOrigin$.withOrigin(origin.scala:76)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.$anonfun$resolveOperatorsUpWithPruning$1(AnalysisHelper.scala:138)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.allowInvokingTransformsInAnalyzer(AnalysisHelper.scala:323)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning(AnalysisHelper.scala:134)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUpWithPruning$(AnalysisHelper.scala:130)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUpWithPruning(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp(AnalysisHelper.scala:111)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper.resolveOperatorsUp$(AnalysisHelper.scala:110)
	at org.apache.spark.sql.catalyst.plans.logical.LogicalPlan.resolveOperatorsUp(LogicalPlan.scala:32)
	at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:52)
	at org.apache.spark.sql.catalyst.analysis.ResolveSessionCatalog.apply(ResolveSessionCatalog.scala:46)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$2(RuleExecutor.scala:222)
	at scala.collection.LinearSeqOptimized.foldLeft(LinearSeqOptimized.scala:126)
	at scala.collection.LinearSeqOptimized.foldLeft$(LinearSeqOptimized.scala:122)
	at scala.collection.immutable.List.foldLeft(List.scala:91)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1(RuleExecutor.scala:219)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$execute$1$adapted(RuleExecutor.scala:211)
	at scala.collection.immutable.List.foreach(List.scala:431)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.execute(RuleExecutor.scala:211)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.org$apache$spark$sql$catalyst$analysis$Analyzer$$executeSameContext(Analyzer.scala:226)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$execute$1(Analyzer.scala:222)
	at org.apache.spark.sql.catalyst.analysis.AnalysisContext$.withNewAnalysisContext(Analyzer.scala:173)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:222)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.execute(Analyzer.scala:188)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.$anonfun$executeAndTrack$1(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker$.withTracker(QueryPlanningTracker.scala:89)
	at org.apache.spark.sql.catalyst.rules.RuleExecutor.executeAndTrack(RuleExecutor.scala:182)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.$anonfun$executeAndCheck$1(Analyzer.scala:209)
	at org.apache.spark.sql.catalyst.plans.logical.AnalysisHelper$.markInAnalyzer(AnalysisHelper.scala:330)
	at org.apache.spark.sql.catalyst.analysis.Analyzer.executeAndCheck(Analyzer.scala:208)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$analyzed$1(QueryExecution.scala:77)
	at org.apache.spark.sql.catalyst.QueryPlanningTracker.measurePhase(QueryPlanningTracker.scala:138)
	at org.apache.spark.sql.execution.QueryExecution.$anonfun$executePhase$2(QueryExecution.scala:219)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	... 65 more

Expected results

I would expect this to create a schema here in the catalog catalog_b and allow me to save data to it.

Further details

This is an effort to create a local "delta lake" for testing which can be compatible with Databrick's three layer namepace used by their Unity Catalog.

Environment information

  • Delta Lake version: 3.0.0
  • Spark version: 3.4.1
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [x] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [ ] No. I cannot contribute a bug fix at this time.

rosspalmer avatar Jan 04 '24 23:01 rosspalmer

+1 this makes it hard to create tests for code that uses multiple catalogs in production

jelmerk avatar Mar 22 '24 09:03 jelmerk

+1

ezu-mutt avatar Mar 27 '24 13:03 ezu-mutt

+1 Any workaround to be able to test 3-layer-namespace?

boittega avatar May 23 '24 16:05 boittega

Hi! We're encountering the same problem. Have you made any progress or found a workaround to resolve this issue?

chriotte avatar May 28 '24 14:05 chriotte

Any update or does anyone have a workaround for this ?

jackedmundson avatar May 29 '24 13:05 jackedmundson

+1

meharanjan318 avatar Jun 06 '24 05:06 meharanjan318

Just confirming I am still being blocked by this. We have a "workaround" where we squish the catalog and database names together when running locally but its not pretty...

rosspalmer avatar Jun 11 '24 18:06 rosspalmer

I am also facing this issue

flaviokr avatar Jun 19 '24 21:06 flaviokr

I'd also like to see this address for unit tests

faltesekATG avatar Jul 02 '24 17:07 faltesekATG

I've confirmed that (1) using Delta catalog and an Iceberg catalog works ✅ (2) using two Iceberg catalogs works ✅ (3) using two Delta catalogs fails ❌

WIP investigating why

scottsand-db avatar Jul 17 '24 16:07 scottsand-db

+1 i'm facing the same problem while using 2 delta catalogs

carloshhelt avatar Aug 05 '24 11:08 carloshhelt

+1

keanuxr avatar Aug 15 '24 21:08 keanuxr

+1

daniel-vizcaino avatar Aug 17 '24 00:08 daniel-vizcaino

+1

denisoliveirac avatar Aug 18 '24 14:08 denisoliveirac

+1

abutala avatar Aug 20 '24 02:08 abutala

is there any feedback on this? seems like iceberg is the only way to do this?

wysisoft avatar Aug 27 '24 11:08 wysisoft

+1

ghost avatar Sep 11 '24 22:09 ghost

+1 in pyspark have the same issue in test below and at the end it will thrown with Parsing exception like

[PARSE_SYNTAX_ERROR] Syntax error at or near '.'.(line 1, pos 20)

== SQL == spark_catalog.source.source_table_join --------------------^^^ and the same does not work for DeltaTable.createOrReplace if I use full qualified name catalog.schema.table `

@pytest.fixture(scope="session")
def spark_session():
    shutil.rmtree("spark-warehouse", ignore_errors=True)

os.environ['PYSPARK_PYTHON'] = sys.executable
os.environ['PYSPARK_DRIVER_PYTHON'] = sys.executable
builder = (SparkSession.builder
         .master("local[*]")
         .config("spark.jars.packages", "io.delta:delta-core_2.12:2.3.0")
         .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
         .config("spark.databricks.delta.schema.autoMerge.enabled", "true") 
         .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
         .appName("test")
)
yield configure_spark_with_delta_pip(builder).getOrCreate()

shutil.rmtree("spark-warehouse", ignore_errors=True)

  @pytest.mark.usefixtures("spark_session")
      def test_join_operation_with_catalog(self, spark_session: SparkSession):
          source_schema = StructType([
              StructField("id", StringType(), True),
              StructField("derived_column", StringType(), True),
              StructField("filter_column", StringType(), True)
          ])

    spark_session.sql("CREATE SCHEMA source")
    spark_session.sql("DROP TABLE IF EXISTS spark_catalog.source.source_table_join")
    spark_session.catalog.setCurrentCatalog("spark_catalog")
    spark_session.catalog.setCurrentDatabase("source")
    
    DeltaTable.createOrReplace(spark_session).tableName("source_table_join").addColumns(
                source_schema).execute()

    try:
        print(DeltaTable.forName(spark_session, "source.source_table_join").toDF().collect())
        print('SUCCESS')
    except Exception as err:
        print("FAILURE")
        print(err)`

nemenko avatar Oct 10 '24 15:10 nemenko

@scottsand-db I've tested this with Spark 4.0-preview2 and Delta Lake 4.0 preview, same issue. This should be fixed before shipping Delta for 4.0 at least.

mahic avatar Oct 16 '24 09:10 mahic

Has there been any fix for this, I am still observing the issue when writing tests.

moritzmeister avatar Jan 24 '25 08:01 moritzmeister

Are there any news on that? It's super important to have this aligned. There seems to be nor progress on that for over a year! I'm seriously thinking of fully switching to Iceberg as it seems to emerge as the only viable option now.

sadowskik avatar Feb 02 '25 03:02 sadowskik

@sadowskik apologies for the delay in getting to this task, we're looking into it. @rosspalmer would you be open to submitting a fix for this? We're happy to loop in appropriate Delta experts to help guide you as well.

raveeram-db avatar Feb 03 '25 20:02 raveeram-db

@raveeram-db I'll have to refresh myself on the specifics but do have some upcoming available time. I'll follow up next week.

rosspalmer avatar Feb 05 '25 22:02 rosspalmer

Thanks @rosspalmer, appreciate it! @LukasRupprecht is an expert in this area and should be able to help out with any questions

raveeram-db avatar Feb 05 '25 22:02 raveeram-db

@LukasRupprecht Would it be possible to set up the situation in the OP as test suite for spark.org.apache.spark.sql.delta.catalog.DeltaCatalog? Are there any base classes that would be helpful in setting up the test?

rosspalmer avatar Feb 17 '25 17:02 rosspalmer

@rosspalmer You could add the test case to CustomCatalogSuite. That suite already has several test case for testing a different catalog implementations so should provide some examples. We can add a test that has two DeltaCatalogs with different names configured.

LukasRupprecht avatar Feb 19 '25 01:02 LukasRupprecht

@LukasRupprecht I've got the CustomCatalogSuite working on my machine and am working on recreating the issue in there. I have a couple questions on communications:

  1. I am able to run the suite via sbt commands but am still having trouble importing the project in Intellji, would a specific slack channel be the best place troubleshoot my setup?

  2. For questions on this bug, should I continue discussions on this GitHub thread or would you want me to slack you directly?

  3. How do I get setup to push branches for your review?

rosspalmer avatar Mar 03 '25 21:03 rosspalmer

figured out #1 ^^ All setup now in Intellij and have added the failing test. I am working on debugging the issue now

rosspalmer avatar Mar 04 '25 22:03 rosspalmer

Thanks for working on this @rosspalmer !

raveeram-db avatar Mar 04 '25 23:03 raveeram-db

I've identified the issue but unsure of the best avenue to attempt a fix. @LukasRupprecht Could you advise on the situation below?

Background:

The DeltaCatalog is an extension of Spark's DelegatingCatalogExtension which allows for overriding specific methods of a base CatalogPlugin. Any methods that are not overwritten get called on the delegate base catalog.

For spark.sql.catalog.spark_catalog, the default Spark V2SessionCatalog is set as the delegate automatically the first time this catalog is utilized.

For catalogs not named spark_catalog, this call to set a delegate doesn't happen naturally and thus when catalog_b is called in the example above, the .name() method is passed to the delegate which is null and throws the NullPointerException.

Issue:

The DelegatingCatalogExtension class sets the initialize method as final which prevents us from passing the catalog name (catalog_b in my example) to the class when it's being generated (Catalogs.load() method). From my initial review, I don't know of another way to pass the name after the catalog is initially loaded

Question:

What would be the best way to tackle this?

  1. Looking at the Iceberg catalog example, they don't use the DelegatingCatalogExtension and pull the name, plus other configs, in the initialize call. Is this an option?
  2. Theoretically we should be able to set a delegate like in the default spark_catalog but I don't know how to do this while getting the secondary name. Would it be usefully to create a class like DummyCatalog in the test suite that would wrap an internal DeltaCatalog?

rosspalmer avatar Mar 05 '25 19:03 rosspalmer