nessie
nessie copied to clipboard
SQLExtensions: Iceberg compaction doesn't work for table on non-default reference.
The table on non-default reference is accessed using
a) use tableName@branch
syntax (TableReference
helper class)
b) call USE REFERENCE refName IN nessie
and use table name to access it.
For the Iceberg compaction call procedure, both ways don't work.
a) when the tableName@branch
syntax is used, BaseProcedure.toIdentifier() will eat the backtick syntax as spark parsers are used. So, gets the below error
== SQL ==
nessie.compaction.table_1@compaction_1
-------------------------^^^
at app//org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255)
at app//org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124)
at app//org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
at app//org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:61)
at app//org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser.parseMultipartIdentifier(IcebergSparkSqlExtensionsParser.scala:87)
at app//org.apache.spark.sql.catalyst.parser.extensions.NessieSparkSqlExtensionsParser.parseMultipartIdentifier(NessieSparkSqlExtensionsParser.scala:91)
at app//org.apache.iceberg.spark.Spark3Util.catalogAndIdentifier(Spark3Util.java:716)
at app//org.apache.iceberg.spark.Spark3Util.catalogAndIdentifier(Spark3Util.java:728)
... 20 more
Not sure how to fix this as it is a common iceberg code. May be not possible to plugin Nessie parser. Might have to introduce a custom procedure just to handle that.
b) USE REFERENCE refName IN nessie
Even though non-default branch is selected, compaction cannot use it.
Because compaction try to load the table from Spark3BinPackStrategy.rewriteFiles which doesn't use the latest conf while building the NessieCatalog.
conf is used from SQLConf in spark code. So, we need to update the SQLCOnf to fix it.
org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table db.tbl not found
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:138)
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:92)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:292)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
at org.apache.iceberg.spark.actions.Spark3BinPackStrategy.rewriteFiles(Spark3BinPackStrategy.java:68)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
some more info about how SQLConf is used instead of active session conf session
at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:417)
at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
at org.apache.spark.sql.connector.catalog.CatalogManager$$Lambda$2081.1666621687.apply(Unknown Source:-1)
at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
- locked <0x2c31> (a org.apache.spark.sql.connector.catalog.CatalogManager)
at org.apache.spark.sql.execution.datasources.v2.NessieUtils$.setCurrentRefForSpark(NessieUtils.scala:219)
org.apache.iceberg.spark.SparkCatalog#initialize()
will call buildIcebergCatalog(name, options)
where options
are obtained from SQLConf
in Catalogs#load()
And when org.apache.iceberg.spark.SparkCatalog#load()
is called from DataFrame Read flow for compaction, it uses the above created catalog and fails to find the Nessie table (as it looks in the default branch)
org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table db.tbl not found
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:138)
at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:92)
at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:292)
at scala.Option.map(Option.scala:230)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
at org.apache.iceberg.spark.actions.Spark3BinPackStrategy.rewriteFiles(Spark3BinPackStrategy.java:68)
org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
I think it is possible to fix it on the Iceberg side also. Let me try and see how the community reacts to it.
Trying to fix it in Iceberg itself: https://github.com/apache/iceberg/pull/5284
Trying to fix it in Iceberg itself: https://github.com/apache/iceberg/pull/5284
It seems too many testcases (100+) in Iceberg sets dynamic configurations and never reset it. So, it is impacting the tests when I use the active session conf :(
Latest observations:
The above issue (USE reference case) has been fixed in Iceberg-0.14.0 (only for spark3.3 and 3.2. Spark-3.1 still has issues because Iceberg is not backporting all the changes)
The original cause was that DataframeReader
from the compaction call procedure used to call SparkCatalog
, which used to build the catalog with SQLConf instead of active SessionConf. So, compaction was always applied on the default branch which was initially configured.
Now, compaction uses SparkCachedTableCatalog
instead of SparkCatalog
(After https://github.com/apache/iceberg/pull/5247/). And the new SparkCachedTableCatalog
doesn't use SQLConf
to build the catalog.
Another observation is that even the TableReference syntax (That is catalogName.dbname.table@ref) is working with compaction now with the same changes. Because identifier is mapped to UUID and later mapped back to identifer. So, no problem for parsing UUID from spark parser.
@ajantha-bhat What's the status of this issue?
Oh. I thought the related PR was merged.
https://github.com/projectnessie/nessie/pull/4590
Let me rebase it today and you can take a look at it.