qbeast-spark
qbeast-spark copied to clipboard
Add Table properties to Qbeast
Description
Solves issue #42 .
The problematic with saveAsTable()
goes beyond a simple override
method. It requires a lot of reworking in the design and implementation of QbeastDataSource
and its related classes.
A DataSource
is the main entry point for writing and reading with Qbeast Format through Spark. Apache Spark has two different versions of this API:
-
DataSourceV1: this is the first version developed to adapt and extend Apache Spark sources, introduced in version 1.3. Its limitations include dependencies with SparkSession, lack of support for reading optimizations, and no partition or sorting information. V1 understands the data as Relations (BaseRelation, HadoopFsRelation...) and can be extended through
RelationProvider
andCreatableRelationProvider
. -
DataSourceV2: this was released a couple of years ago with the 2.3 version change. It has better compatibility with Java, more flexible extension points, and no dependencies on high-level API. V2 understands the data underneath as Table and can be extended through
TableProvider
,Table
and implemented with traits likeSupportsWrite
,SupportsRead
,SupportsOverwrite
...
This separation of API's has been profitable in terms of optimization but lacks consistency between both. Some SQL statements and operations are implemented for V1, not V2, and vice-versa.
There's a nice series of blogs around this topic that you can read to get the full picture: http://blog.madhukaraphatak.com/categories/datasource-v2-series/
Type of change
This change consists on:
-
Refinement of
QbeastTableImpl
. This class extends Table and adds writing capabilities to V2 Qbeast DataSource. -
Add
QbeastWriterBuilder
. This class is in charge of building the Write process. It extendsV1WriteBuilder
, which makes the conversion easier. -
Add
QbeastCatalog
.
QbeastCatalog
is an extension of CatalogExtension
with SupportNamespaces
(support creation, rename and deletion of namespaces) and StagingTableCatalog
. An StagingTableCatalog
is for creating a table before committing any metadata along with the content of CREATE TABLE AS SELECT or REPLACE TABLE AS SELECT operations. From the Spark documentation, we can observe the following:
It is highly recommended to implement this trait whenever possible so that CREATE TABLE AS SELECT and REPLACE TABLE AS SELECT operations are atomic. For example, when one runs a REPLACE TABLE AS SELECT operation, if the catalog does not implement this trait, the planner will first drop the table via TableCatalog.dropTable(Identifier), then create the table via TableCatalog.createTable(Identifier, StructType, Transform[], Map), and then perform the write via SupportsWrite.newWriteBuilder(LogicalWriteInfo). However, if the write operation fails, the catalog will have already dropped the table, and the planner cannot roll back the dropping of the table.
If the catalog implements this plugin, the catalog can implement the methods to "stage" the creation and the replacement of a table. After the table's BatchWrite.commit(WriterCommitMessage[]) is called, StagedTable.commitStagedChanges() is called, at which point the staged table can complete both the data write and the metadata swap operation atomically.
The QbeastCatalog
can be used as the default catalog through the spark_catalog configuration or as an external catalog:
spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
spark.sql.catalog.qbeast_catalog= io.qbeast.spark.internal.sources.catalog.QbeastCatalog
spark.sql.catalog.catalog_one.warehouse=/tmp/dir
-
Add
QbeastStagedTableImpl
. This class contains the code to commit the staged changes atomically. It creates the underlying log and saves any data that may have been processed in the SELECT AS statement. -
Add
SaveAsTableRule
to make surecolumnsToIndex
option is passed to theQbeastTableImpl
. -
Add
QbeastAnalysis
rules to read with V1 optimizations. -
Add functionalities to
QbeastDataSource
and rework some of the methods. - Separate tests under Integration tests and Correctness tests. The Integrations test will make sure the Spark Statements work as supposed. The correctness will ensure that we don't miss information and that all the metadata is well described.
Checklist:
Here is the list of things you should do before submitting this pull request:
- [x] New feature / bug fix has been committed following the Contribution guide.
- [x] Add comments to the code (make it easier for the community!).
- [ ] Change the documentation.
- [x] Add tests.
- [x] Your branch is updated to the main branch (dependent changes have been merged).
How Has This Been Tested? (Optional)
We added few test in order to check how units of code behave.
Under io.qbeast.spark.internal.sources.catalog
package, you will found different tests.
-
DefaultStagedTableTest
. This is the table returned by default in case qbeast cannot operate. -
QbeastCatalogTest
. This tests checks all the methods called by the CatalogManager, and make sure they work as expected. -
QbeastCatalogIntegrationTest
. This tests checks integration with other catalogs and behavior of the Qbeast implementation. -
QbeastStagedTableTest
. This tests contains method testing for the implementation of an Staged table.
Under io.qbeast.spark.utils
, some tests had been refactored.
-
QbeastSparkCorrectnessTest
. Checks if the results are correct (writing, reading, sampling..) -
QbeastSparkIntegrationTest
. Checks if the Spark DataFrame API is behaving properly. -
QbeastSQLIntegrationTest
. Checks if the SQL commands are performed correctly.
Hi @eavilaes ! I have done a potential workaround on this issue.
A lot of things need to be re-worked; it's still a WIP. But due to timing, you can try the new SaveAsTable functionality and be the Q&A Tester hehe. The only thing you must do is add some extra configuration to the Spark Session (like you did for Delta on #42 ):
spark-shell \
--conf spark.sql.extensions=io.qbeast.spark.internal.QbeastSparkSessionExtension \
--conf spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
Please, feel free to suggest any other changes or problems you face during the execution. Many thanks!
Just tested, working smoooooth in my case🥳 Thank you very much! 🐻
Codecov Report
Merging #124 (686a74f) into main (966b007) will increase coverage by
1.05%
. The diff coverage is98.61%
.
:exclamation: Current head 686a74f differs from pull request most recent head 2b5c00b. Consider uploading reports for the commit 2b5c00b to get more accurate results
@@ Coverage Diff @@
## main #124 +/- ##
==========================================
+ Coverage 91.81% 92.86% +1.05%
==========================================
Files 62 73 +11
Lines 1453 1709 +256
Branches 114 126 +12
==========================================
+ Hits 1334 1587 +253
- Misses 119 122 +3
Impacted Files | Coverage Δ | |
---|---|---|
...ala/io/qbeast/spark/utils/SparkToQTypesUtils.scala | 88.88% <ø> (-3.42%) |
:arrow_down: |
...ain/scala/org/apache/spark/sql/V2AndV1Traits.scala | 50.00% <50.00%> (ø) |
|
...spark/internal/sources/v2/QbeastWriteBuilder.scala | 87.50% <87.50%> (ø) |
|
.../internal/sources/catalog/QbeastCatalogUtils.scala | 97.05% <97.05%> (ø) |
|
...e/src/main/scala/io/qbeast/core/model/CubeId.scala | 96.36% <100.00%> (+0.11%) |
:arrow_up: |
...c/main/scala/io/qbeast/core/model/QuerySpace.scala | 94.73% <100.00%> (+11.40%) |
:arrow_up: |
...qbeast/spark/delta/SparkDeltaMetadataManager.scala | 88.88% <100.00%> (+1.38%) |
:arrow_up: |
...io/qbeast/spark/index/query/QuerySpecBuilder.scala | 100.00% <100.00%> (ø) |
|
...scala/io/qbeast/spark/internal/QbeastOptions.scala | 95.00% <100.00%> (+3.33%) |
:arrow_up: |
...t/spark/internal/QbeastSparkSessionExtension.scala | 100.00% <100.00%> (ø) |
|
... and 13 more |
:mega: We’re building smart automated test selection to slash your CI/CD build times. Learn more
Calling INSERT OVERWRITE
on a manged table (created through saveAsTable
) doesn't eliminate the old data, and it operates rather like an INSERT INTO
. Other formats such as Delta
and Parquet
do eliminate the old data and only the inserted data is left after the operation.
Calling INSERT OVERWRITE
or INSERT INTO
will and should change the underlying data, and this is true on both VIEW and TABLE for delta or parquet.
'INSERT INTO' operates correctly with qbeast.
When calling either INSERT OVERWRITE
or INSERT INTO
the OTree algorithm is invoked, as it should.
The following works just fine on a VIEW:
import spark.implicits._
val targetColumns = Seq("product_id", "brand", "price", "user_id")
val initialData = loadTestData(spark).select(targetColumns.map(col): _*)
initialData.write
.format("qbeast")
.option("cubeSize", "5000")
.option("columnsToIndex", "user_id,product_id")
.save(tmpDir)
val df = spark.read.format("qbeast").load(tmpDir)
df.createOrReplaceTempView("initial")
val dataToInsert = Seq((1, "qbeast", 9.99, 1)).toDF(targetColumns: _*)
dataToInsert.createOrReplaceTempView("toInsert")
spark.sql("INSERT OVERWRITE initial TABLE toInsert")
spark.sql("SELECT * FROM initial").count() shouldBe 1
However, when a TABLE is created using saveAsTable
, the behavior is unexpected because the new record is inserted without eliminating the existing data:
import spark.implicits._
val targetColumns = Seq("product_id", "brand", "price", "user_id")
val initialData = loadTestData(spark).select(targetColumns.map(col): _*)
initialData.write
.format("qbeast")
.option("cubeSize", "5000")
.option("columnsToIndex", "user_id,product_id")
.saveAsTable("initial")
val dataToInsert = Seq((1, "qbeast", 9.99, 1)).toDF(targetColumns: _*)
dataToInsert.createOrReplaceTempView("toInsert")
spark.sql("INSERT OVERWRITE initial TABLE toInsert")
spark.sql("SELECT * FROM initial").count() shouldBe 1 + initialData.count
In the case of delta, records are eliminated in both scenarios with a second JSON log file with remove
entries and a single add
entry.
For Qbeast, when using saveAsTable
a second JSON log file is created with a new revision and a single add
entry and NO remove
entries. When using createOrReplaceTempView
, on the other hand, there are in fact remove
entries in the second log file while the revision remains the same.
UPDATES
Recently, we have been testing if QbeastCatalog
could coexist with other catalogs such as DeltaCatalog
or HoodieCatalog
or Iceberg's SparkSessionCatalog
.
The short answer is yes and no.
- First hypothesis: the user can use multiple catalogs in the same Spark Session. Yes. It can do so by using the following configuration:
spark.sql.catalog.catalog_one=your.package.CatalogOne
spark.sql.catalog.catalog_one.warehouse=warehouseLocation
spark.sql.catalog_catalog_two=your.package.CatalogTwo
...
-
DeltaCatalog
andHoodieCatalog
extend a class calledDelegatingCatalogExtension
. This class is a simple implementation ofCatalogExtenssion
that allows custom only specific methods ofCatalog
, delegating the rest of them to the built-in SparkCatalog. -
Problem: configuration needed to use the corresponding catalog is
spark.sql.catalog.spark_catalog
. As far as I understand, this can only be overridden once. So, if you pretend to use two different catalogs implemented in the same way (withDelegatingCatalogExtension
), you will not be able to use both simultaneously.
Possible Solutions:
-
Extend
DeltaCatalog
. But then we will need to do so for each of the existing catalogs we want to be mixed in. -
Extend
CatalogExtenssion
directly. This would allow interoperability with different formats/catalogs. The drawback is that we must implement (or at least delegate the logic) all Catalog operations.
If anyone has a better idea of how Catalog works in Spark or if I missed some important information, please let me know!
Since we want the Catalog implementation to be independent of underlying formats, we decided to extend CatalogExtenssion
(option 2).
To implement all the methods required (listNamespaces(), listTables()...), we will use the delegated catalog (the one configured with the spark_catalog
key) or the default spark built-in catalog.
We can access those variables through the CatalogManager
(spark.sessionState.catalogManager
) method called v2SessionCatalog
.
/**
* If the V2_SESSION_CATALOG config is specified, we try to instantiate the user-specified v2
* session catalog. Otherwise, return the default session catalog.
*
* This catalog is a v2 catalog that delegates to the v1 session catalog. it is used when the
* session catalog is responsible for an identifier, but the source requires the v2 catalog API.
* This happens when the source implementation extends the v2 TableProvider API and is not listed
* in the fallback configuration, spark.sql.sources.useV1SourceList
*/
private[sql] def v2SessionCatalog: CatalogPlugin = {
conf.getConf(SQLConf.V2_SESSION_CATALOG_IMPLEMENTATION).map { _ =>
catalogs.getOrElseUpdate(SESSION_CATALOG_NAME, loadV2SessionCatalog())
}.getOrElse(defaultSessionCatalog)
}
In that way, the user can configure the QbeastCatalog
in two ways:
-
Using
spark_catalog
config (this is recommendable if you don't have intentions of sharing the session with other Catalog implementations)
spark.sql.catalog.spark_catalog=io.qbeast.spark.internal.sources.catalog.QbeastCatalog
- Using multiple catalog configurations:
spark.sql.catalog.qbeast_catalog= io.qbeast.spark.internal.sources.catalog.QbeastCatalog
spark.sql.catalog.qbeast_catalog.warehouse=/tmp/dir
....
// Write data with qbeast_catalog prefix
data.write
.format("qbeast")
.option("columnsToIndex", "id")
.saveAsTable("qbeast_catalog.default.qbeast_table")
Made some changes in the test, hope the Codecov report is satisfied this time hehe.
Hello! I think this PR is ready to merge since everyone has tried the SNAPSHOT
version and no major errors were raised. Please, @Adricu8 @eavilaes @Jiaweihu08 , when you have time, approve the changes or express your concerns before closing this. Thank you!