delta icon indicating copy to clipboard operation
delta copied to clipboard

[BUG][SPARK] listTables() fails after createOrReplaceTempView('abc') called with PARSE_SYNTAX_ERROR

Open richardcerny opened this issue 1 year ago • 7 comments

Bug

Describe the problem

After upgrade from Spark spark_version 3.3.2 to 3.4.1 catalog.listTables command is always failing after the "createOrReplaceTempView" is called. See code snipped bellow.

Steps to reproduce

spark = (SparkSession
        .builder
        .appName("Python Spark SQL basic example")
        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
        .getOrCreate()
        )

SILVER_DB = 'silver_test'
spark.sql(f'CREATE DATABASE {SILVER_DB}')
view_name_fact = 'abc'

print(f"DBs before: {spark.catalog.listDatabases()}") # ok
print(f"Tables_before: {spark.catalog.listTables()}") # OK
print(f"Catalogs before: {spark.catalog.listCatalogs()}")  # ok
print(f'Current catalog before: {spark.catalog.currentCatalog()}')  # ok
print(f"Tables after silver: {spark.catalog.listTables(SILVER_DB)}") # ok

df_fact_fixture1 = spark.createDataFrame([Row('1', 'A', 'A', 100.0)])  # OK
df_fact_fixture1.createOrReplaceTempView(view_name_fact) # OK  ##### ONCE createOrReplaceTempView is called, afterward any command with spark.catalog.listTables() fails!!!!!!!!

spark.sql(f"select * from {view_name_fact}").show() # OK
df = spark.sql(f"select * from {view_name_fact}") # OK
assert 1 == df.count() # OK
print(f"DBs after: {spark.catalog.listDatabases()}")  # OK
print(f"Catalogs after: {spark.catalog.listCatalogs()}")   # OK
print(f'Current catalog after: {spark.catalog.currentCatalog()}')  # OK
print(f"Tables after: {spark.catalog.listTables()}") # ERROR
print(f"Tables after silver: {spark.catalog.listTables(SILVER_DB)}")  # ERROR

Observed results

   :: resolving dependencies :: org.apache.spark#spark-submit-parent-7b10b7e4-5468-4e77-acce-1eda288f12ba;1.0
        confs: [default]
        found io.delta#delta-core_2.12;2.4.0 in central
        found io.delta#delta-storage;2.4.0 in central
        found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 144ms :: artifacts dl 6ms
        :: modules in use:
        io.delta#delta-core_2.12;2.4.0 from central in [default]
        io.delta#delta-storage;2.4.0 from central in [default]
        org.antlr#antlr4-runtime;4.9.3 from central in [default]
        ---------------------------------------------------------------------
        |                  |            modules            ||   artifacts   |
        |       conf       | number| search|dwnlded|evicted|| number|dwnlded|
        ---------------------------------------------------------------------
        |      default     |   3   |   0   |   0   |   0   ||   3   |   0   |
        ---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-7b10b7e4-5468-4e77-acce-1eda288f12ba
        confs: [default]
        0 artifacts copied, 3 already retrieved (0kB/4ms)
24/02/06 14:17:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
DBs before: [Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/workspaces/repository-pipeline/spark-warehouse'), Database(name='silver_test', catalog='spark_catalog', description='', locationUri='file:/workspaces/repository-pipeline/spark-warehouse/silver_test.db')]
Tables_before: []
Catalogs before: [CatalogMetadata(name='spark_catalog', description=None)]
Current catalog before: spark_catalog
Tables after silver: []
+---+---+---+-----+                                                             
| _1| _2| _3|   _4|
+---+---+---+-----+
|  1|  A|  A|100.0|
+---+---+---+-----+

DBs after: [Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/workspaces/repository-pipeline/spark-warehouse'), Database(name='silver_test', catalog='spark_catalog', description='', locationUri='file:/workspaces/repository-pipeline/spark-warehouse/silver_test.db')]
Catalogs after: [CatalogMetadata(name='spark_catalog', description=None)]
Current catalog after: spark_catalog

FAILED
>   print(f"Tables after: {spark.catalog.listTables()}")

libs/lakehouse/tests/test_z_catalog_2.py:133: 

/usr/local/lib/python3.10/site-packages/pyspark/sql/catalog.py:309: in listTables
    iter = self._jcatalog.listTables(dbName).toLocalIterator()
/usr/local/lib/python3.10/site-packages/py4j/java_gateway.py:1322: in __call__
    return_value = get_return_value(
a = ('xro77', <py4j.clientserver.JavaClient object at 0x7f98286213f0, 'o36', 'listTables'), kw = {}, converted = ParseException()

    def deco(*a: Any, **kw: Any) - Any:
        try:
            return f(*a, **kw)
        except Py4JJavaError as e:
            converted = convert_exception(e.java_exception)
            if not isinstance(converted, UnknownException):
                # Hide where the exception came from that shows a non-Pythonic
                # JVM exception message.
               raise converted from None
E               pyspark.errors.exceptions.captured.ParseException: 
E               [PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)
E               
E               == SQL ==
E               
E               ^^^

/usr/local/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:175: ParseException

Expected results

Shows list of tables.

Further details

While removing following configuration from the spark session, the code works, but the catalog extension is necessary for other features.

        .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
        .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
        .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

Environment information

  • Delta Lake version: 2.4.0
  • Spark version: 3.4.1
  • Scala version: 2.12

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [ ] No. I cannot contribute a bug fix at this time.

richardcerny avatar Feb 06 '24 14:02 richardcerny

We're hitting this as well, @richardcerny were you able to get to a resolution?

mdrakiburrahman avatar Apr 29 '24 14:04 mdrakiburrahman

Found a workaround:

Went from:

def listTables(databaseName: String): Array[String] = {
    if (databaseExists(databaseName)) {
      return spark.catalog.listTables(databaseName).collect().map(_.name)
    }
    Array.empty[String]
  }

To this:

def listTables(databaseName: String): Array[String] = {
    if (databaseExists(databaseName)) {

      // Delta 2.4.0 has a regression with Spark 3.4.1 that makes
      // spark.catalog.listTables calls fail
      //
      // >>> https://github.com/delta-io/delta/issues/2610
      //
      return spark
        .sql(s"SHOW TABLES IN $databaseName")
        .collect()
        .map(row => row.getAs[String]("tableName"))
    }
    Array.empty[String]
  }

mdrakiburrahman avatar Apr 29 '24 14:04 mdrakiburrahman

thank you @mdrakiburrahman. We have used the same workaround.

richardcerny avatar May 09 '24 11:05 richardcerny

It seems the problem is this line, val isTemp = row.getBoolean(2): https://github.com/apache/spark/blob/1eb558c3a6fbdd59e5a305bc3ab12ce748f6511f/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala#L126

returns false when the catalog is set to DeltaCatalog

You can see it by starting a spark shell with/without Delta and run

spark.range(0,2).createOrReplaceTempView("abc")

val namespace = Seq("spark_catalog", "default")
val plan = org.apache.spark.sql.catalyst.plans.logical.ShowTables(org.apache.spark.sql.catalyst.analysis.UnresolvedNamespace(namespace), None)


    val tables = spark.sessionState.executePlan(plan).toRdd.collect().map { row =>
      val tableName = row.getString(1)
      println(tableName)
      val namespaceName = row.getString(0)
      println(namespaceName)
      val isTemp = row.getBoolean(2)
      println(isTemp)
      if (isTemp) {
        
        // Temp views do not belong to any catalog. We shouldn't prepend the catalog name here.
        // val ns = if (namespaceName.isEmpty) Nil else Seq(namespaceName)
        // makeTable(ns :+ tableName)
      } else {
        //val ns = parseIdent(namespaceName)
        val ns = spark.sessionState.sqlParser.parseMultipartIdentifier(namespaceName)
        //makeTable(catalog.name() +: ns :+ tableName)
      }
    }

@cloud-fan I have seen some contribs you did for Delta and Spark related to catalog. Any insights?

felipepessoto avatar Sep 17 '24 23:09 felipepessoto

@felipepessoto thanks for providing the repro! What was the error you hit? And can you also post the result of spark.sessionState.executePlan(plan).analyzed.treeString?

cloud-fan avatar Sep 18 '24 08:09 cloud-fan

@cloud-fan it is the same error that @richardcerny reported. In spark-shell, using my repro code:

org.apache.spark.sql.catalyst.parser.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)

== SQL ==

^^^

  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:144)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:67)
  at $anonfun$tables$1(<console>:37)
  at $anonfun$tables$1$adapted(<console>:23)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  ... 64 elided

Calling spark.catalog.listTables().show():

org.apache.spark.sql.catalyst.parser.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)

== SQL ==

^^^

  at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:144)
  at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52)
  at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:67)
  at org.apache.spark.sql.internal.CatalogImpl.parseIdent(CatalogImpl.scala:49)
  at org.apache.spark.sql.internal.CatalogImpl.$anonfun$listTables$1(CatalogImpl.scala:132)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at org.apache.spark.sql.internal.CatalogImpl.listTables(CatalogImpl.scala:123)
  at org.apache.spark.sql.internal.CatalogImpl.listTables(CatalogImpl.scala:98)
  ... 47 elided

treeString:

scala> println(spark.sessionState.executePlan(plan).analyzed.treeString)
ShowTables [namespace#2, tableName#3, isTemporary#4]
+- ResolvedNamespace org.apache.spark.sql.delta.catalog.DeltaCatalog@32855523, [default]

felipepessoto avatar Sep 18 '24 18:09 felipepessoto

one workaround is to set spark.sql.legacy.useV1Command to true. Ideally DeltaCatalog should not return views in listTables.

cloud-fan avatar Sep 19 '24 09:09 cloud-fan