nessie icon indicating copy to clipboard operation
nessie copied to clipboard

SQLExtensions: Iceberg compaction doesn't work for table on non-default reference.

Open ajantha-bhat opened this issue 2 years ago • 5 comments

The table on non-default reference is accessed using a) use tableName@branch syntax (TableReference helper class) b) call USE REFERENCE refName IN nessie and use table name to access it.

For the Iceberg compaction call procedure, both ways don't work. a) when the tableName@branch syntax is used, BaseProcedure.toIdentifier() will eat the backtick syntax as spark parsers are used. So, gets the below error

== SQL ==
nessie.compaction.table_1@compaction_1
-------------------------^^^

	at app//org.apache.spark.sql.catalyst.parser.ParseException.withCommand(ParseDriver.scala:255)
	at app//org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parse(ParseDriver.scala:124)
	at app//org.apache.spark.sql.execution.SparkSqlParser.parse(SparkSqlParser.scala:49)
	at app//org.apache.spark.sql.catalyst.parser.AbstractSqlParser.parseMultipartIdentifier(ParseDriver.scala:61)
	at app//org.apache.spark.sql.catalyst.parser.extensions.IcebergSparkSqlExtensionsParser.parseMultipartIdentifier(IcebergSparkSqlExtensionsParser.scala:87)
	at app//org.apache.spark.sql.catalyst.parser.extensions.NessieSparkSqlExtensionsParser.parseMultipartIdentifier(NessieSparkSqlExtensionsParser.scala:91)
	at app//org.apache.iceberg.spark.Spark3Util.catalogAndIdentifier(Spark3Util.java:716)
	at app//org.apache.iceberg.spark.Spark3Util.catalogAndIdentifier(Spark3Util.java:728)
	... 20 more

Not sure how to fix this as it is a common iceberg code. May be not possible to plugin Nessie parser. Might have to introduce a custom procedure just to handle that.

b) USE REFERENCE refName IN nessie Even though non-default branch is selected, compaction cannot use it. Because compaction try to load the table from Spark3BinPackStrategy.rewriteFiles which doesn't use the latest conf while building the NessieCatalog.

conf is used from SQLConf in spark code. So, we need to update the SQLCOnf to fix it.

org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table db.tbl not found
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:138)
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:92)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:292)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
	at org.apache.iceberg.spark.actions.Spark3BinPackStrategy.rewriteFiles(Spark3BinPackStrategy.java:68)

	org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)    

ajantha-bhat avatar Jun 28 '22 15:06 ajantha-bhat

some more info about how SQLConf is used instead of active session conf session

at org.apache.iceberg.spark.SparkCatalog.initialize(SparkCatalog.java:417)
      at org.apache.spark.sql.connector.catalog.Catalogs$.load(Catalogs.scala:60)
      at org.apache.spark.sql.connector.catalog.CatalogManager.$anonfun$catalog$1(CatalogManager.scala:52)
      at org.apache.spark.sql.connector.catalog.CatalogManager$$Lambda$2081.1666621687.apply(Unknown Source:-1)
      at scala.collection.mutable.HashMap.getOrElseUpdate(HashMap.scala:86)
      at org.apache.spark.sql.connector.catalog.CatalogManager.catalog(CatalogManager.scala:52)
      - locked <0x2c31> (a org.apache.spark.sql.connector.catalog.CatalogManager)
      at org.apache.spark.sql.execution.datasources.v2.NessieUtils$.setCurrentRefForSpark(NessieUtils.scala:219)

org.apache.iceberg.spark.SparkCatalog#initialize() will call buildIcebergCatalog(name, options) where options are obtained from SQLConf in Catalogs#load()

And when org.apache.iceberg.spark.SparkCatalog#load() is called from DataFrame Read flow for compaction, it uses the above created catalog and fails to find the Nessie table (as it looks in the default branch)

 org.apache.spark.sql.catalyst.analysis.NoSuchTableException: Table db.tbl not found
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:138)
	at org.apache.iceberg.spark.SparkCatalog.loadTable(SparkCatalog.java:92)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$1(DataFrameReader.scala:292)
	at scala.Option.map(Option.scala:230)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:265)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)
	at org.apache.iceberg.spark.actions.Spark3BinPackStrategy.rewriteFiles(Spark3BinPackStrategy.java:68)

	org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:239)   

ajantha-bhat avatar Jul 13 '22 11:07 ajantha-bhat

I think it is possible to fix it on the Iceberg side also. Let me try and see how the community reacts to it.

ajantha-bhat avatar Jul 14 '22 16:07 ajantha-bhat

Trying to fix it in Iceberg itself: https://github.com/apache/iceberg/pull/5284

ajantha-bhat avatar Jul 15 '22 08:07 ajantha-bhat

Trying to fix it in Iceberg itself: https://github.com/apache/iceberg/pull/5284

It seems too many testcases (100+) in Iceberg sets dynamic configurations and never reset it. So, it is impacting the tests when I use the active session conf :(

ajantha-bhat avatar Jul 15 '22 09:07 ajantha-bhat

Latest observations:

The above issue (USE reference case) has been fixed in Iceberg-0.14.0 (only for spark3.3 and 3.2. Spark-3.1 still has issues because Iceberg is not backporting all the changes) The original cause was that DataframeReader from the compaction call procedure used to call SparkCatalog, which used to build the catalog with SQLConf instead of active SessionConf. So, compaction was always applied on the default branch which was initially configured.

Now, compaction uses SparkCachedTableCatalog instead of SparkCatalog (After https://github.com/apache/iceberg/pull/5247/). And the new SparkCachedTableCatalog doesn't use SQLConf to build the catalog.

Another observation is that even the TableReference syntax (That is catalogName.dbname.table@ref) is working with compaction now with the same changes. Because identifier is mapped to UUID and later mapped back to identifer. So, no problem for parsing UUID from spark parser.

ajantha-bhat avatar Aug 04 '22 17:08 ajantha-bhat

@ajantha-bhat What's the status of this issue?

snazy avatar Sep 06 '22 12:09 snazy

Oh. I thought the related PR was merged.

https://github.com/projectnessie/nessie/pull/4590

Let me rebase it today and you can take a look at it.

ajantha-bhat avatar Sep 06 '22 12:09 ajantha-bhat