[BUG][SPARK] listTables() fails after createOrReplaceTempView('abc') called with PARSE_SYNTAX_ERROR
Bug
Describe the problem
After upgrade from Spark spark_version 3.3.2 to 3.4.1 catalog.listTables command is always failing after the "createOrReplaceTempView" is called. See code snipped bellow.
Steps to reproduce
spark = (SparkSession
.builder
.appName("Python Spark SQL basic example")
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
.getOrCreate()
)
SILVER_DB = 'silver_test'
spark.sql(f'CREATE DATABASE {SILVER_DB}')
view_name_fact = 'abc'
print(f"DBs before: {spark.catalog.listDatabases()}") # ok
print(f"Tables_before: {spark.catalog.listTables()}") # OK
print(f"Catalogs before: {spark.catalog.listCatalogs()}") # ok
print(f'Current catalog before: {spark.catalog.currentCatalog()}') # ok
print(f"Tables after silver: {spark.catalog.listTables(SILVER_DB)}") # ok
df_fact_fixture1 = spark.createDataFrame([Row('1', 'A', 'A', 100.0)]) # OK
df_fact_fixture1.createOrReplaceTempView(view_name_fact) # OK ##### ONCE createOrReplaceTempView is called, afterward any command with spark.catalog.listTables() fails!!!!!!!!
spark.sql(f"select * from {view_name_fact}").show() # OK
df = spark.sql(f"select * from {view_name_fact}") # OK
assert 1 == df.count() # OK
print(f"DBs after: {spark.catalog.listDatabases()}") # OK
print(f"Catalogs after: {spark.catalog.listCatalogs()}") # OK
print(f'Current catalog after: {spark.catalog.currentCatalog()}') # OK
print(f"Tables after: {spark.catalog.listTables()}") # ERROR
print(f"Tables after silver: {spark.catalog.listTables(SILVER_DB)}") # ERROR
Observed results
:: resolving dependencies :: org.apache.spark#spark-submit-parent-7b10b7e4-5468-4e77-acce-1eda288f12ba;1.0
confs: [default]
found io.delta#delta-core_2.12;2.4.0 in central
found io.delta#delta-storage;2.4.0 in central
found org.antlr#antlr4-runtime;4.9.3 in central
:: resolution report :: resolve 144ms :: artifacts dl 6ms
:: modules in use:
io.delta#delta-core_2.12;2.4.0 from central in [default]
io.delta#delta-storage;2.4.0 from central in [default]
org.antlr#antlr4-runtime;4.9.3 from central in [default]
---------------------------------------------------------------------
| | modules || artifacts |
| conf | number| search|dwnlded|evicted|| number|dwnlded|
---------------------------------------------------------------------
| default | 3 | 0 | 0 | 0 || 3 | 0 |
---------------------------------------------------------------------
:: retrieving :: org.apache.spark#spark-submit-parent-7b10b7e4-5468-4e77-acce-1eda288f12ba
confs: [default]
0 artifacts copied, 3 already retrieved (0kB/4ms)
24/02/06 14:17:18 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
DBs before: [Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/workspaces/repository-pipeline/spark-warehouse'), Database(name='silver_test', catalog='spark_catalog', description='', locationUri='file:/workspaces/repository-pipeline/spark-warehouse/silver_test.db')]
Tables_before: []
Catalogs before: [CatalogMetadata(name='spark_catalog', description=None)]
Current catalog before: spark_catalog
Tables after silver: []
+---+---+---+-----+
| _1| _2| _3| _4|
+---+---+---+-----+
| 1| A| A|100.0|
+---+---+---+-----+
DBs after: [Database(name='default', catalog='spark_catalog', description='default database', locationUri='file:/workspaces/repository-pipeline/spark-warehouse'), Database(name='silver_test', catalog='spark_catalog', description='', locationUri='file:/workspaces/repository-pipeline/spark-warehouse/silver_test.db')]
Catalogs after: [CatalogMetadata(name='spark_catalog', description=None)]
Current catalog after: spark_catalog
FAILED
> print(f"Tables after: {spark.catalog.listTables()}")
libs/lakehouse/tests/test_z_catalog_2.py:133:
/usr/local/lib/python3.10/site-packages/pyspark/sql/catalog.py:309: in listTables
iter = self._jcatalog.listTables(dbName).toLocalIterator()
/usr/local/lib/python3.10/site-packages/py4j/java_gateway.py:1322: in __call__
return_value = get_return_value(
a = ('xro77', <py4j.clientserver.JavaClient object at 0x7f98286213f0, 'o36', 'listTables'), kw = {}, converted = ParseException()
def deco(*a: Any, **kw: Any) - Any:
try:
return f(*a, **kw)
except Py4JJavaError as e:
converted = convert_exception(e.java_exception)
if not isinstance(converted, UnknownException):
# Hide where the exception came from that shows a non-Pythonic
# JVM exception message.
raise converted from None
E pyspark.errors.exceptions.captured.ParseException:
E [PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)
E
E == SQL ==
E
E ^^^
/usr/local/lib/python3.10/site-packages/pyspark/errors/exceptions/captured.py:175: ParseException
Expected results
Shows list of tables.
Further details
While removing following configuration from the spark session, the code works, but the catalog extension is necessary for other features.
.config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0")
.config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension")
.config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")
Environment information
- Delta Lake version: 2.4.0
- Spark version: 3.4.1
- Scala version: 2.12
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [ ] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [ ] No. I cannot contribute a bug fix at this time.
We're hitting this as well, @richardcerny were you able to get to a resolution?
Found a workaround:
Went from:
def listTables(databaseName: String): Array[String] = {
if (databaseExists(databaseName)) {
return spark.catalog.listTables(databaseName).collect().map(_.name)
}
Array.empty[String]
}
To this:
def listTables(databaseName: String): Array[String] = {
if (databaseExists(databaseName)) {
// Delta 2.4.0 has a regression with Spark 3.4.1 that makes
// spark.catalog.listTables calls fail
//
// >>> https://github.com/delta-io/delta/issues/2610
//
return spark
.sql(s"SHOW TABLES IN $databaseName")
.collect()
.map(row => row.getAs[String]("tableName"))
}
Array.empty[String]
}
thank you @mdrakiburrahman. We have used the same workaround.
It seems the problem is this line, val isTemp = row.getBoolean(2): https://github.com/apache/spark/blob/1eb558c3a6fbdd59e5a305bc3ab12ce748f6511f/sql/core/src/main/scala/org/apache/spark/sql/internal/CatalogImpl.scala#L126
returns false when the catalog is set to DeltaCatalog
You can see it by starting a spark shell with/without Delta and run
spark.range(0,2).createOrReplaceTempView("abc")
val namespace = Seq("spark_catalog", "default")
val plan = org.apache.spark.sql.catalyst.plans.logical.ShowTables(org.apache.spark.sql.catalyst.analysis.UnresolvedNamespace(namespace), None)
val tables = spark.sessionState.executePlan(plan).toRdd.collect().map { row =>
val tableName = row.getString(1)
println(tableName)
val namespaceName = row.getString(0)
println(namespaceName)
val isTemp = row.getBoolean(2)
println(isTemp)
if (isTemp) {
// Temp views do not belong to any catalog. We shouldn't prepend the catalog name here.
// val ns = if (namespaceName.isEmpty) Nil else Seq(namespaceName)
// makeTable(ns :+ tableName)
} else {
//val ns = parseIdent(namespaceName)
val ns = spark.sessionState.sqlParser.parseMultipartIdentifier(namespaceName)
//makeTable(catalog.name() +: ns :+ tableName)
}
}
@cloud-fan I have seen some contribs you did for Delta and Spark related to catalog. Any insights?
@felipepessoto thanks for providing the repro! What was the error you hit? And can you also post the result of spark.sessionState.executePlan(plan).analyzed.treeString?
@cloud-fan it is the same error that @richardcerny reported. In spark-shell, using my repro code:
org.apache.spark.sql.catalyst.parser.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)
== SQL ==
^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:144)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:67)
at $anonfun$tables$1(<console>:37)
at $anonfun$tables$1$adapted(<console>:23)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
... 64 elided
Calling spark.catalog.listTables().show():
org.apache.spark.sql.catalyst.parser.ParseException:
[PARSE_SYNTAX_ERROR] Syntax error at or near end of input.(line 1, pos 0)
== SQL ==
^^^
at org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:306)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:144)
at org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:52)
at org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:67)
at org.apache.spark.sql.internal.CatalogImpl.parseIdent(CatalogImpl.scala:49)
at org.apache.spark.sql.internal.CatalogImpl.$anonfun$listTables$1(CatalogImpl.scala:132)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.internal.CatalogImpl.listTables(CatalogImpl.scala:123)
at org.apache.spark.sql.internal.CatalogImpl.listTables(CatalogImpl.scala:98)
... 47 elided
treeString:
scala> println(spark.sessionState.executePlan(plan).analyzed.treeString)
ShowTables [namespace#2, tableName#3, isTemporary#4]
+- ResolvedNamespace org.apache.spark.sql.delta.catalog.DeltaCatalog@32855523, [default]
one workaround is to set spark.sql.legacy.useV1Command to true. Ideally DeltaCatalog should not return views in listTables.