[PROPOSAL]: Support variable schema for included columns
Problem Description
This design proposal is for adding feature request #229.
Currently, Hyperspace supports creating indexes only on data with fixed schema. This means:
- All columns from "indexed" and "included" column should always be present in all data records.
- When refreshing, no column can be added/removed from "indexed" or "included" columns.
This makes it impossible to support creating index on data with evolving schema.
It is inevitable to enforce "no change" restriction on "indexed" columns as index records are bucketized and sorted on them; However, "included" columns are really payload data and do not affect physical layout of index files, therefore "no change" restriction on them could be lifted.
There are two cases to consider for schema changes:
- (During refresh) (some) new data records do not contain some included columns.
- (During refresh) (some) new data records contain some new included columns.
Goal
The user should be able to create/refresh an index on data with evolving schema with ease.
General Assumptions
- Indexed columns do not change and they are always present in all source data records during index creation and any subsequent index refresh.
- Data records can not have conflicting columns with the same name but different data types added as included columns, across index versions.
(Parquet's "mergeSchema" does not support this case either. As an example, if we have a column whose value is
Stringin a set of records andIntin other records, when trying to load all these records into a DataFrame using the "mergeSchema" option, ParquetReader fails with error:Failed to merge incompatible data types string and int.) - Included columns are nullable as they may not exist in all index records. We do not support defining a customized default value for an included column. We follow/use what Parquet does with missing columns when reading data with pre-defined schema (i.e. fills them with
null).
Solution Overview
Requirements
- User should be able to create/refresh an index on data formats: Parquet, Json, csv.
createIndexandrefreshIndexAPIs should let user define "included" column(s) easily by including or excluding columns from the data schema.- List of included columns can be changed during index refresh. New list of included columns is defined by merging included columns from the latest index version with the changes made during refresh. Here are some assumptions:
- A New included column can not have conflict with indexed or existing included columns. It can not be among indexed columns and there can not be a column with the same name but a different data type among existing included columns.
- During index refresh in the "incremental" mode, if a new column is added to the included columns or an existing included column is excluded from included columns list, we only include/exclude it in the index records being created on new data files (i.e. appended files). If the user wants to add/remove the column to/from all existing index records, index refresh in the "full" mode should be used.
API changes
We make changes to the createIndex and refreshIndex APIs to let the user provide information about included columns.
Changes in IndexConfig
We modify index config so that instead of receiving a single list of included columns, it receives an instance of IncludedColumns which contains two lists of columns: "include" and "exclude". They show the columns to include/exclude, according to a known reference schema, to define index's included columns.
case class IndexConfig(
indexName: String,
indexedColumns: Seq[String],
includedColumns: IncludedColumns)
case class IncludedColumns(
include: Seq[String] = Nil,
exclude: Seq[String] = Nil)
Changes in createIndex API
createIndex API remains unchanged; However the list of included columns is now computed according to the above change made to IndexConfig.
During index creation, we use the schema from user's given DataFrame df as the reference schema and compute included columns using this schema with include/exclude columns from IndexConfig.includedColumns.
As an example, assume user wants to create an index on DataFrame df whose schema has 5 columns:
{"C0", "C1", "C2", "C3", "C4"}
and "C0" is picked as the indexed column.
Below is how IndexConfig.includedColumns can be used to define different sets of included columns:
- Pick
{"C1", "C2"}as included columns:
val cols = IncludedColumns(include = Seq("C1", "C2"))
- Pick no column as included columns (i.e. Index only has indexed column):
val cols = IncludedColumns(include = Seq())
- Exclude
{"C1", "C2"}from data schema and pick{"C3", "C4"}as included columns:
val cols = IncludedColumns(exclude = Seq("C1", "C2"))
- Exclude no column from schema and pick all columns as included columns. So index will have
{"C1", "C2", "C3", "C4"}as its included columns:
val cols = IncludedColumns(exclude = Seq())
We use above IncludedColumns instance (i.e. cols) for index creation as:
val ixConfig = IndexConfig(indexName = "ix", indexedColumns = Seq("C0"), includedColumns = cols)
createIndex(df, ixConfig)
Note that if lineage is enabled, we make a minor change to the way index schema is extended:
- Lineage column is added to index schema (no change to current behavior).
- If source data is partitioned, we add any partitioning column which is not already part of user defined indexed/included columns, UNLESS the column is explicitly excluded from the index by user.
Changes in refreshIndex API
During refresh index, Hyperspace has to create a DataFrame, df, on source data files that need to be indexed.
We use schema(schema: StructType) API in DataFrameReader to define the schema for df. Note that we do not need to have full source data schema here, but only the relevant columns for the index which are columns to be indexed or included during refresh.
During refresh, user can use an instance of IncludedColumns (defined above) to modify existing included columns.
If there is no change in included columns or if user only removes some existing included column(s), we can simply use index schema from the latest index version as DataFrame schema when creating df.
If user adds new columns to included columns, we need to add those columns to the df schema. For Parquet and Json, this is straightforward as they are self-describing formats. However, for csv, the schema should be correctly extended to have the name and data type for any new included column.
We add two "optional" arguments: includedColumns and schema to refreshIndex API to address above:
includedColumnsis used to define changes in included columns:- A column in
includedColumns.includeis treated as a new included column and will be added to index records created during refresh. - A column in
includedColumns.excludewill be removed from the list of included columns and index records created during refresh will not have it.
- A column in
schemais used when there are some new columns inincludedColumns.includeand it defines the data type for each of those columns. We compute the schema fordfby merging this schema with latest index schema minus excluded columns defined inIncludedColumns.exclude(if any).
def refreshIndex(
indexName: String,
mode: String,
includedColumns: IncludedColumns, // optional, used if change needed in current included columns
schema: StructType // optional, used if includedColumns.include is non-empty
)
Index Metadata Changes
Index metadata is updated so that:
CoveringIndex.Properties.schemaString: Captures "merged" schema of all valid index files.CoveringIndex.Properties.Columns.included: Captures "latest" set of included columns.
Note that there could be some column(s) in schemaString which are not among included columns. This can happen if user adds a column as included column in some early version of index and then drops it when refreshing index in incremental mode.
Moreover, we do not need to store source data schema under source.plan.properties.relations.head.dataSchemaJson. Currently, this schema is only used to define df schema when refreshing. However, with the changes explained above to refreshIndex API, this schema can now be computed from index's schemaString and included columns plus the new schema argument added to refreshIndex.
Index leverage at query optimization/execution
Given that an index schema can now be changed by adding/removing included columns, index Parquet files are no longer guaranteed to have the same schema. One option to load them into a DataFrame is using mergeSchema option, however as this option is costly, we avoid using it and instead index files are loaded by providing the merged schema from index metadata. This can be done by setting schema(<merged schema from metadata>) when loading index content into a DataFrame. Existing rules also set this merged schema as the relation schema when replacing data source with an index.
Impact on Index Optimization
As the set of included columns can be changed when refreshing index, index records that belong to the same bucket could have different schema. This affects the OptimizeAction as it operates on merging separate smaller Parquet index files, whose content belong to the same bucket, into a single large Parquet index file. However, a given Parquet file can only have a single schema. Therefore:
- Extra validations should be added to the
OptimizeActioncode to check/avoid merging index files with heterogeneous schema. - User should be aware of this side-effect of evolving index schema on the physical layout of index files, as new heterogeneous records are appended to the index content.
There are two approaches for fixing OptimizeAction:
- Approach 1: While merging Parquet files, Merge their schema as well. So if two index files have records belonging to the same bucket, but the files have different schemas, merge the files into a single file with a merged schema.
- Pros: It achieve the best reduction in number of index files as all index files which belong to the same bucket are merged during index optimization.
- Cons: Depending on number of records for each schema and how different schemas are, we may get many
nullvalues in records for missing columns.
- Approach 2: Two index files which belong to the same bucket can only be merged if they have the same schema.
- Pros: No extra
nullvalue is generated due to merging heterogenous schemas. - Cons: Depending on number of records for each schema and how different schemas are, index optimization may not effective in reducing number of index files.
- Pros: No extra
Example Scenario
Here is an example scenario showing how index metadata and index files would be changed as an index goes through refresh and optimization while both source data and included columns are changed.
V0: // after create
Included Columns: {C1, C2}
Schema: {C0, C1, C2}
IX files:
I00 (schema = {C0, C1, C2})
I01 (schema = {C0, C1, C2})
V1: // refresh incremental mode with some additional/deleted files
// Change included columns: Drop "C1" and Add "C3"
Included Columns: {C2, C3}
Schema: {C0, C1, C2, C3}
IX files:
I00' (schema = {C0, C1, C2})
I01' (schema = {C0, C1, C2})
I10 (schema = {C0, C2, C3})
V2: // refresh full mode with some additional/deleted files
// Change included columns: Drop "C3" and Add "C4"
Included Columns: {C2, C4}
Schema: {C0, C2, C4}
IX files
I20 (schema = {C0, C1, C2})
I21 (schema = {C0, C1, C2})
V3: // refresh incremental mode with some additional/deleted files
// Change included columns: Drop "C4" and Add "C5"
Included Columns: {C2, C5}
Schema: {C0, C2, C4, C5}
IX files
I20' (schema = {C0, C1, C2})
I21' (schema = {C0, C1, C2})
I31 (schema = {C0, C2, C5})
I32 (schema = {C0, C2, C5})
V4: // refresh incremental mode with some additional/deleted files
// No Change in included columns
Included Columns: {C2, C5}
Schema: {C0, C2, C4, C5}
IX files
I20'' (schema = {C0, C1, C2})
I21'' (schema = {C0, C1, C2})
I31' (schema = {C0, C2, C5})
I32' (schema = {C0, C2, C5})
I41 (schema = {C0, C2, C5})
V5: // optimize in full mode
Included Columns: {C2, C5}
Schema: {C0, C2, C4, C5}
Approach-1: // Merge schemas when merging files
IX files
I40 (schema = {C0, C2, C4, C5})
I41 (schema = {C0, C2, C4, C5})
I41 (schema = {C0, C2, C4, C5})
Approach-2: // Merge files with compatible schema
IX files
I20'' (schema = {C0, C1, C2})
I21'' (schema = {C0, C1, C2})
I31' (schema = {C0, C2, C5})
I51 (schema = {C0, C2, C5}) // merge of I32' and I41
Implementation
- [ ] PR1: #309 Add include/exclude columns support to
IndexConfigand modifycreateIndex - [ ] PR2: #328 Add support for index schema change during refresh
@imback82 @apoorvedave1 @sezruby @rapoth Can you take a look at this proposal and leave your comments (if any)? Thnx
createIndex(df, mode = "include", columns = Seq("C1", "C2")): Pick{"C1", "C2"}as the included columns.
The current createIndex API takes in IndexConfig. Can you update the examples with indexed columns as well?
For source data files, we need to use Parquet's
mergeSchemaas the data could have columns not listed as indexed or included columns.
What if source data files are non-Parquet?
The current
createIndexAPI takes inIndexConfig. Can you update the examples with indexed columns as well?
The examples are updated by adding IndexConfig.
@imback82 I updated the proposal to handle schema in source data files. Specifically I added a new section: "Index Creation and Refresh changes" and modified "Index Metadata Changes". Plz take a look and let me know about your comments. Thnx!
Generally LGTM! How about includeColumns excludeColumns rather than mode + columns?
I wonder incremental refresh will work well with the previous data. Could you create a prototype PR?
def refreshIndex(
indexName: String,
mode: String,
includedColumns: IncludedColumns, // optional, used if change needed in current included columns
schema: StructType // optional, used if includedColumns.include is non-empty
)
includedColumns + schema can be includedColumnSchema or includedColumns? since schema also includes the name.
And I think it's okay not to support optimizeIndex for changed schema at first, to avoid complexity; we could do it later if it's needed.