qbeast-spark icon indicating copy to clipboard operation
qbeast-spark copied to clipboard

Add Table properties to Qbeast

Open osopardo1 opened this issue 2 years ago • 7 comments

Description

Solves issue #42 .

The problematic with saveAsTable() goes beyond a simple override method. It requires a lot of reworking in the design and implementation of QbeastDataSource and its related classes.

A DataSource is the main entry point for writing and reading with Qbeast Format through Spark. Apache Spark has two different versions of this API:

  • DataSourceV1: this is the first version developed to adapt and extend Apache Spark sources, introduced in version 1.3. Its limitations include dependencies with SparkSession, lack of support for reading optimizations, and no partition or sorting information. V1 understands the data as Relations (BaseRelation, HadoopFsRelation...) and can be extended through RelationProvider and CreatableRelationProvider.
  • DataSourceV2: this was released a couple of years ago with the 2.3 version change. It has better compatibility with Java, more flexible extension points, and no dependencies on high-level API. V2 understands the data underneath as Table and can be extended through TableProvider, Table and implemented with traits like SupportsWrite, SupportsRead, SupportsOverwrite...

This separation of API's has been profitable in terms of optimization but lacks consistency between both. Some SQL statements and operations are implemented for V1, not V2, and vice-versa.

There's a nice series of blogs around this topic that you can read to get the full picture: http://blog.madhukaraphatak.com/categories/datasource-v2-series/

Type of change

This change consists on:

  • Refinement of QbeastTableImpl. This class extends Table and adds writing capabilities to V2 Qbeast DataSource.
  • Add QbeastWriterBuilder. This class is in charge of building the Write process. It extends V1WriteBuilder, which makes the conversion easier.
  • Add QbeastCatalog.

QbeastCatalog is an extension of CatalogExtension with SupportNamespaces (support creation, rename and deletion of namespaces) and StagingTableCatalog. An StagingTableCatalog is for creating a table before committing any metadata along with the content of CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT operations. From the Spark documentation, we can observe the following:

It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first drop the table via TableCatalog.dropTable(Identifier), then create the table via TableCatalog.createTable(Identifier, StructType, Transform[], Map), and then perform the write via SupportsWrite.newWriteBuilder(LogicalWriteInfo). However, if the write operation fails, the catalog will have already dropped the table, and the planner cannot roll back the dropping of the table.

If the catalog implements this plugin, the catalog can implement the methods to "stage" the creation and the replacement of a table. After the table's BatchWrite.commit(WriterCommitMessage[]) is called, StagedTable.commitStagedChanges() is called, at which point the staged table can complete both the data write and the metadata swap operation atomically.

The QbeastCatalog can be used as the default catalog through the spark_catalog configuration or as an external catalog:

spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
spark.sql.catalog.qbeast_catalog= io.qbeast.spark.internal.sources.catalog.QbeastCatalog
spark.sql.catalog.catalog_one.warehouse=/tmp/dir
  • Add QbeastStagedTableImpl. This class contains the code to commit the staged changes atomically. It creates the underlying log and saves any data that may have been processed in the SELECT AS statement.
  • Add SaveAsTableRule to make sure columnsToIndex option is passed to the QbeastTableImpl.
  • Add QbeastAnalysis rules to read with V1 optimizations.
  • Add functionalities to QbeastDataSource and rework some of the methods.
  • Separate tests under Integration tests and Correctness tests. The Integrations test will make sure the Spark Statements work as supposed. The correctness will ensure that we don't miss information and that all the metadata is well described.

Checklist:

Here is the list of things you should do before submitting this pull request:

  • [x] New feature / bug fix has been committed following the Contribution guide.
  • [x] Add comments to the code (make it easier for the community!).
  • [ ] Change the documentation.
  • [x] Add tests.
  • [x] Your branch is updated to the main branch (dependent changes have been merged).

How Has This Been Tested? (Optional)

We added few test in order to check how units of code behave. Under io.qbeast.spark.internal.sources.catalog package, you will found different tests.

  • DefaultStagedTableTest. This is the table returned by default in case qbeast cannot operate.
  • QbeastCatalogTest. This tests checks all the methods called by the CatalogManager, and make sure they work as expected.
  • QbeastCatalogIntegrationTest. This tests checks integration with other catalogs and behavior of the Qbeast implementation.
  • QbeastStagedTableTest. This tests contains method testing for the implementation of an Staged table.

Under io.qbeast.spark.utils, some tests had been refactored.

  • QbeastSparkCorrectnessTest. Checks if the results are correct (writing, reading, sampling..)
  • QbeastSparkIntegrationTest. Checks if the Spark DataFrame API is behaving properly.
  • QbeastSQLIntegrationTest. Checks if the SQL commands are performed correctly.

osopardo1 avatar Aug 03 '22 12:08 osopardo1

Hi @eavilaes ! I have done a potential workaround on this issue.

A lot of things need to be re-worked; it's still a WIP. But due to timing, you can try the new SaveAsTable functionality and be the Q&A Tester hehe. The only thing you must do is add some extra configuration to the Spark Session (like you did for Delta on #42 ):

spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog

Please, feel free to suggest any other changes or problems you face during the execution. Many thanks!

osopardo1 avatar Aug 04 '22 13:08 osopardo1

Just tested, working smoooooth in my case🥳 Thank you very much! 🐻

eavilaes avatar Aug 05 '22 10:08 eavilaes

Codecov Report

Merging #124 (686a74f) into main (966b007) will increase coverage by 1.05%. The diff coverage is 98.61%.

:exclamation: Current head 686a74f differs from pull request most recent head 2b5c00b. Consider uploading reports for the commit 2b5c00b to get more accurate results

@@            Coverage Diff             @@
##             main     #124      +/-   ##
==========================================
+ Coverage   91.81%   92.86%   +1.05%     
==========================================
  Files          62       73      +11     
  Lines        1453     1709     +256     
  Branches      114      126      +12     
==========================================
+ Hits         1334     1587     +253     
- Misses        119      122       +3     
Impacted Files Coverage Δ
...ala/io/qbeast/spark/utils/SparkToQTypesUtils.scala 88.88% <ø> (-3.42%) :arrow_down:
...ain/scala/org/apache/spark/sql/V2AndV1Traits.scala 50.00% <50.00%> (ø)
...spark/internal/sources/v2/QbeastWriteBuilder.scala 87.50% <87.50%> (ø)
.../internal/sources/catalog/QbeastCatalogUtils.scala 97.05% <97.05%> (ø)
...e/src/main/scala/io/qbeast/core/model/CubeId.scala 96.36% <100.00%> (+0.11%) :arrow_up:
...c/main/scala/io/qbeast/core/model/QuerySpace.scala 94.73% <100.00%> (+11.40%) :arrow_up:
...qbeast/spark/delta/SparkDeltaMetadataManager.scala 88.88% <100.00%> (+1.38%) :arrow_up:
...io/qbeast/spark/index/query/QuerySpecBuilder.scala 100.00% <100.00%> (ø)
...scala/io/qbeast/spark/internal/QbeastOptions.scala 95.00% <100.00%> (+3.33%) :arrow_up:
...t/spark/internal/QbeastSparkSessionExtension.scala 100.00% <100.00%> (ø)
... and 13 more

:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more

codecov[bot] avatar Aug 05 '22 13:08 codecov[bot]

Calling INSERT OVERWRITE on a manged table (created through saveAsTable) doesn't eliminate the old data, and it operates rather like an INSERT INTO. Other formats such as Delta and Parquet do eliminate the old data and only the inserted data is left after the operation.

Calling INSERT OVERWRITE or INSERT INTO will and should change the underlying data, and this is true on both VIEW and TABLE for delta or parquet.

'INSERT INTO' operates correctly with qbeast.

When calling either INSERT OVERWRITE or INSERT INTO the OTree algorithm is invoked, as it should.

The following works just fine on a VIEW:

import spark.implicits._

val targetColumns = Seq("product_id", "brand", "price", "user_id")
val initialData = loadTestData(spark).select(targetColumns.map(col): _*)
initialData.write
  .format("qbeast")
  .option("cubeSize", "5000")
  .option("columnsToIndex", "user_id,product_id")
  .save(tmpDir)

val df = spark.read.format("qbeast").load(tmpDir)
df.createOrReplaceTempView("initial")
val dataToInsert = Seq((1, "qbeast", 9.99, 1)).toDF(targetColumns: _*)
dataToInsert.createOrReplaceTempView("toInsert")

spark.sql("INSERT OVERWRITE initial TABLE toInsert")
spark.sql("SELECT * FROM initial").count() shouldBe 1

However, when a TABLE is created using saveAsTable, the behavior is unexpected because the new record is inserted without eliminating the existing data:

import spark.implicits._

val targetColumns = Seq("product_id", "brand", "price", "user_id")

val initialData = loadTestData(spark).select(targetColumns.map(col): _*)
initialData.write
  .format("qbeast")
  .option("cubeSize", "5000")
  .option("columnsToIndex", "user_id,product_id")
  .saveAsTable("initial")

val dataToInsert = Seq((1, "qbeast", 9.99, 1)).toDF(targetColumns: _*)
dataToInsert.createOrReplaceTempView("toInsert")

spark.sql("INSERT OVERWRITE initial TABLE toInsert")

spark.sql("SELECT * FROM initial").count() shouldBe 1 + initialData.count

In the case of delta, records are eliminated in both scenarios with a second JSON log file with remove entries and a single add entry.

For Qbeast, when using saveAsTable a second JSON log file is created with a new revision and a single add entry and NO remove entries. When using createOrReplaceTempView, on the other hand, there are in fact remove entries in the second log file while the revision remains the same.

Jiaweihu08 avatar Aug 10 '22 17:08 Jiaweihu08

UPDATES

Recently, we have been testing if QbeastCatalog could coexist with other catalogs such as DeltaCatalog or HoodieCatalog or Iceberg's SparkSessionCatalog.

The short answer is yes and no.

  • First hypothesis: the user can use multiple catalogs in the same Spark Session. Yes. It can do so by using the following configuration:
spark.sql.catalog.catalog_one=your.package.CatalogOne
spark.sql.catalog.catalog_one.warehouse=warehouseLocation
spark.sql.catalog_catalog_two=your.package.CatalogTwo
...
  • DeltaCatalog and HoodieCatalog extend a class called DelegatingCatalogExtension. This class is a simple implementation of CatalogExtenssion that allows custom only specific methods of Catalog, delegating the rest of them to the built-in SparkCatalog.

  • Problem: configuration needed to use the corresponding catalog is spark.sql.catalog.spark_catalog. As far as I understand, this can only be overridden once. So, if you pretend to use two different catalogs implemented in the same way (with DelegatingCatalogExtension), you will not be able to use both simultaneously.

Possible Solutions:

  • Extend DeltaCatalog. But then we will need to do so for each of the existing catalogs we want to be mixed in.

  • Extend CatalogExtenssion directly. This would allow interoperability with different formats/catalogs. The drawback is that we must implement (or at least delegate the logic) all Catalog operations.

If anyone has a better idea of how Catalog works in Spark or if I missed some important information, please let me know!

osopardo1 avatar Sep 09 '22 13:09 osopardo1

Since we want the Catalog implementation to be independent of underlying formats, we decided to extend CatalogExtenssion (option 2).

To implement all the methods required (listNamespaces(), listTables()...), we will use the delegated catalog (the one configured with the spark_catalog key) or the default spark built-in catalog. We can access those variables through the CatalogManager (spark.sessionState.catalogManager) method called v2SessionCatalog.

  /**
   * If the V2_SESSION_CATALOG config is specified, we try to instantiate the user-specified v2
   * session catalog. Otherwise, return the default session catalog.
   *
   * This catalog is a v2 catalog that delegates to the v1 session catalog. it is used when the
   * session catalog is responsible for an identifier, but the source requires the v2 catalog API.
   * This happens when the source implementation extends the v2 TableProvider API and is not listed
   * in the fallback configuration, spark.sql.sources.useV1SourceList
   */
  private[sql] def v2SessionCatalog: CatalogPlugin = {
    conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { _ =>
      catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
    }.getOrElse(defaultSessionCatalog)
  }

In that way, the user can configure the QbeastCatalog in two ways:

  1. Using spark_catalog config (this is recommendable if you don't have intentions of sharing the session with other Catalog implementations)
spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
  1. Using multiple catalog configurations:
spark.sql.catalog.qbeast_catalog= io.qbeast.spark.internal.sources.catalog.QbeastCatalog
spark.sql.catalog.qbeast_catalog.warehouse=/tmp/dir

....

// Write data with qbeast_catalog prefix
data.write
          .format("qbeast")
          .option("columnsToIndex", "id")
          .saveAsTable("qbeast_catalog.default.qbeast_table")

osopardo1 avatar Sep 19 '22 14:09 osopardo1

Made some changes in the test, hope the Codecov report is satisfied this time hehe.

osopardo1 avatar Oct 05 '22 14:10 osopardo1

Hello! I think this PR is ready to merge since everyone has tried the SNAPSHOT version and no major errors were raised. Please, @Adricu8 @eavilaes @Jiaweihu08 , when you have time, approve the changes or express your concerns before closing this. Thank you!

osopardo1 avatar Nov 21 '22 13:11 osopardo1