hyperspace icon indicating copy to clipboard operation
hyperspace copied to clipboard

[PROPOSAL]: Support variable schema for included columns

Open pirz opened this issue 5 years ago • 7 comments

Problem Description

This design proposal is for adding feature request #229.

Currently, Hyperspace supports creating indexes only on data with fixed schema. This means:

  • All columns from "indexed" and "included" column should always be present in all data records.
  • When refreshing, no column can be added/removed from "indexed" or "included" columns.

This makes it impossible to support creating index on data with evolving schema.

It is inevitable to enforce "no change" restriction on "indexed" columns as index records are bucketized and sorted on them; However, "included" columns are really payload data and do not affect physical layout of index files, therefore "no change" restriction on them could be lifted.

There are two cases to consider for schema changes:

  • (During refresh) (some) new data records do not contain some included columns.
  • (During refresh) (some) new data records contain some new included columns.

Goal

The user should be able to create/refresh an index on data with evolving schema with ease.

General Assumptions

  1. Indexed columns do not change and they are always present in all source data records during index creation and any subsequent index refresh.
  2. Data records can not have conflicting columns with the same name but different data types added as included columns, across index versions. (Parquet's "mergeSchema" does not support this case either. As an example, if we have a column whose value is String in a set of records and Int in other records, when trying to load all these records into a DataFrame using the "mergeSchema" option, ParquetReader fails with error: Failed to merge incompatible data types string and int.)
  3. Included columns are nullable as they may not exist in all index records. We do not support defining a customized default value for an included column. We follow/use what Parquet does with missing columns when reading data with pre-defined schema (i.e. fills them with null).

Solution Overview

Requirements

  1. User should be able to create/refresh an index on data formats: Parquet, Json, csv.
  2. createIndex and refreshIndex APIs should let user define "included" column(s) easily by including or excluding columns from the data schema.
  3. List of included columns can be changed during index refresh. New list of included columns is defined by merging included columns from the latest index version with the changes made during refresh. Here are some assumptions:
    • A New included column can not have conflict with indexed or existing included columns. It can not be among indexed columns and there can not be a column with the same name but a different data type among existing included columns.
    • During index refresh in the "incremental" mode, if a new column is added to the included columns or an existing included column is excluded from included columns list, we only include/exclude it in the index records being created on new data files (i.e. appended files). If the user wants to add/remove the column to/from all existing index records, index refresh in the "full" mode should be used.

API changes

We make changes to the createIndex and refreshIndex APIs to let the user provide information about included columns.

Changes in IndexConfig

We modify index config so that instead of receiving a single list of included columns, it receives an instance of IncludedColumns which contains two lists of columns: "include" and "exclude". They show the columns to include/exclude, according to a known reference schema, to define index's included columns.

case class IndexConfig(
    indexName: String,
    indexedColumns: Seq[String],
    includedColumns: IncludedColumns)

case class IncludedColumns(
   include: Seq[String] = Nil,
   exclude: Seq[String] = Nil)

Changes in createIndex API

createIndex API remains unchanged; However the list of included columns is now computed according to the above change made to IndexConfig.

During index creation, we use the schema from user's given DataFrame df as the reference schema and compute included columns using this schema with include/exclude columns from IndexConfig.includedColumns.

As an example, assume user wants to create an index on DataFrame df whose schema has 5 columns:

{"C0", "C1", "C2", "C3", "C4"}

and "C0" is picked as the indexed column.

Below is how IndexConfig.includedColumns can be used to define different sets of included columns:

  • Pick {"C1", "C2"} as included columns:
val cols = IncludedColumns(include = Seq("C1", "C2"))
  • Pick no column as included columns (i.e. Index only has indexed column):
val cols = IncludedColumns(include = Seq())
  • Exclude {"C1", "C2"} from data schema and pick {"C3", "C4"} as included columns:
val cols = IncludedColumns(exclude = Seq("C1", "C2"))
  • Exclude no column from schema and pick all columns as included columns. So index will have {"C1", "C2", "C3", "C4"} as its included columns:
val cols = IncludedColumns(exclude = Seq())

We use above IncludedColumns instance (i.e. cols) for index creation as:

val ixConfig = IndexConfig(indexName = "ix", indexedColumns = Seq("C0"), includedColumns = cols)
createIndex(df, ixConfig)

Note that if lineage is enabled, we make a minor change to the way index schema is extended:

  1. Lineage column is added to index schema (no change to current behavior).
  2. If source data is partitioned, we add any partitioning column which is not already part of user defined indexed/included columns, UNLESS the column is explicitly excluded from the index by user.

Changes in refreshIndex API

During refresh index, Hyperspace has to create a DataFrame, df, on source data files that need to be indexed. We use schema(schema: StructType) API in DataFrameReader to define the schema for df. Note that we do not need to have full source data schema here, but only the relevant columns for the index which are columns to be indexed or included during refresh.

During refresh, user can use an instance of IncludedColumns (defined above) to modify existing included columns.

If there is no change in included columns or if user only removes some existing included column(s), we can simply use index schema from the latest index version as DataFrame schema when creating df.

If user adds new columns to included columns, we need to add those columns to the df schema. For Parquet and Json, this is straightforward as they are self-describing formats. However, for csv, the schema should be correctly extended to have the name and data type for any new included column.

We add two "optional" arguments: includedColumns and schema to refreshIndex API to address above:

  1. includedColumns is used to define changes in included columns:
    • A column in includedColumns.include is treated as a new included column and will be added to index records created during refresh.
    • A column in includedColumns.exclude will be removed from the list of included columns and index records created during refresh will not have it.
  2. schema is used when there are some new columns in includedColumns.include and it defines the data type for each of those columns. We compute the schema for df by merging this schema with latest index schema minus excluded columns defined in IncludedColumns.exclude (if any).
def refreshIndex(
   indexName: String,
   mode: String,
   includedColumns: IncludedColumns, // optional, used if change needed in current included columns
   schema: StructType // optional, used if includedColumns.include is non-empty
   )

Index Metadata Changes

Index metadata is updated so that:

  • CoveringIndex.Properties.schemaString: Captures "merged" schema of all valid index files.
  • CoveringIndex.Properties.Columns.included: Captures "latest" set of included columns.

Note that there could be some column(s) in schemaString which are not among included columns. This can happen if user adds a column as included column in some early version of index and then drops it when refreshing index in incremental mode.

Moreover, we do not need to store source data schema under source.plan.properties.relations.head.dataSchemaJson. Currently, this schema is only used to define df schema when refreshing. However, with the changes explained above to refreshIndex API, this schema can now be computed from index's schemaString and included columns plus the new schema argument added to refreshIndex.

Index leverage at query optimization/execution

Given that an index schema can now be changed by adding/removing included columns, index Parquet files are no longer guaranteed to have the same schema. One option to load them into a DataFrame is using mergeSchema option, however as this option is costly, we avoid using it and instead index files are loaded by providing the merged schema from index metadata. This can be done by setting schema(<merged schema from metadata>) when loading index content into a DataFrame. Existing rules also set this merged schema as the relation schema when replacing data source with an index.

Impact on Index Optimization

As the set of included columns can be changed when refreshing index, index records that belong to the same bucket could have different schema. This affects the OptimizeAction as it operates on merging separate smaller Parquet index files, whose content belong to the same bucket, into a single large Parquet index file. However, a given Parquet file can only have a single schema. Therefore:

  • Extra validations should be added to the OptimizeAction code to check/avoid merging index files with heterogeneous schema.
  • User should be aware of this side-effect of evolving index schema on the physical layout of index files, as new heterogeneous records are appended to the index content.

There are two approaches for fixing OptimizeAction:

  • Approach 1: While merging Parquet files, Merge their schema as well. So if two index files have records belonging to the same bucket, but the files have different schemas, merge the files into a single file with a merged schema.
    • Pros: It achieve the best reduction in number of index files as all index files which belong to the same bucket are merged during index optimization.
    • Cons: Depending on number of records for each schema and how different schemas are, we may get many null values in records for missing columns.
  • Approach 2: Two index files which belong to the same bucket can only be merged if they have the same schema.
    • Pros: No extra null value is generated due to merging heterogenous schemas.
    • Cons: Depending on number of records for each schema and how different schemas are, index optimization may not effective in reducing number of index files.

Example Scenario

Here is an example scenario showing how index metadata and index files would be changed as an index goes through refresh and optimization while both source data and included columns are changed.

V0: // after create
	Included Columns:	{C1, C2}
	Schema:			{C0, C1, C2}
	
	IX files: 
		I00 (schema = {C0, C1, C2})
		I01 (schema = {C0, C1, C2})
	
V1: // refresh incremental mode with some additional/deleted files
	// Change included columns: Drop "C1" and Add "C3"
	
	Included Columns:	{C2, C3}
	Schema:			{C0, C1, C2, C3}
	
	IX files:
		I00' (schema = {C0, C1, C2})
		I01' (schema = {C0, C1, C2})
		I10  (schema = {C0, C2, C3})
	
V2: // refresh full mode with some additional/deleted files
	// Change included columns: Drop "C3" and Add "C4"
	
	Included Columns:	{C2, C4}
	Schema:			{C0, C2, C4}
	
	IX files
		I20  (schema = {C0, C1, C2})
		I21  (schema = {C0, C1, C2})
		
V3: // refresh incremental mode with some additional/deleted files
	// Change included columns: Drop "C4" and Add "C5"
	
	Included Columns:	{C2, C5}
	Schema:			{C0, C2, C4, C5}
	
	IX files
		I20' (schema = {C0, C1, C2})
		I21' (schema = {C0, C1, C2})
		I31  (schema = {C0, C2, C5})
		I32  (schema = {C0, C2, C5})

V4: // refresh incremental mode with some additional/deleted files
	// No Change in included columns
	
	Included Columns:	{C2, C5}
	Schema:			{C0, C2, C4, C5}
	
	IX files
		I20'' (schema = {C0, C1, C2})
		I21'' (schema = {C0, C1, C2})
		I31'  (schema = {C0, C2, C5})
		I32'  (schema = {C0, C2, C5})
		I41   (schema = {C0, C2, C5})


V5: // optimize in full mode

	Included Columns:	{C2, C5}
	Schema:			{C0, C2, C4, C5}

	Approach-1: // Merge schemas when merging files
	IX files
		I40	 (schema = {C0, C2, C4, C5})
		I41  (schema = {C0, C2, C4, C5})
		I41  (schema = {C0, C2, C4, C5})
		
        Approach-2: // Merge files with compatible schema
	IX files
		I20'' (schema = {C0, C1, C2})
		I21'' (schema = {C0, C1, C2})
		I31'  (schema = {C0, C2, C5})
		I51	(schema = {C0, C2, C5}) // merge of I32' and I41

Implementation

  • [ ] PR1: #309 Add include/exclude columns support to IndexConfig and modify createIndex
  • [ ] PR2: #328 Add support for index schema change during refresh

pirz avatar Nov 20 '20 02:11 pirz

@imback82 @apoorvedave1 @sezruby @rapoth Can you take a look at this proposal and leave your comments (if any)? Thnx

pirz avatar Nov 23 '20 23:11 pirz

  • createIndex(df, mode = "include", columns = Seq("C1", "C2")): Pick {"C1", "C2"} as the included columns.

The current createIndex API takes in IndexConfig. Can you update the examples with indexed columns as well?

imback82 avatar Nov 24 '20 00:11 imback82

For source data files, we need to use Parquet's mergeSchema as the data could have columns not listed as indexed or included columns.

What if source data files are non-Parquet?

imback82 avatar Nov 24 '20 00:11 imback82

The current createIndex API takes in IndexConfig. Can you update the examples with indexed columns as well?

The examples are updated by adding IndexConfig.

pirz avatar Nov 24 '20 01:11 pirz

@imback82 I updated the proposal to handle schema in source data files. Specifically I added a new section: "Index Creation and Refresh changes" and modified "Index Metadata Changes". Plz take a look and let me know about your comments. Thnx!

pirz avatar Nov 25 '20 02:11 pirz

Generally LGTM! How about includeColumns excludeColumns rather than mode + columns? I wonder incremental refresh will work well with the previous data. Could you create a prototype PR?

sezruby avatar Nov 25 '20 03:11 sezruby

def refreshIndex(
   indexName: String,
   mode: String,
   includedColumns: IncludedColumns, // optional, used if change needed in current included columns
   schema: StructType // optional, used if includedColumns.include is non-empty
   )

includedColumns + schema can be includedColumnSchema or includedColumns? since schema also includes the name.

And I think it's okay not to support optimizeIndex for changed schema at first, to avoid complexity; we could do it later if it's needed.

sezruby avatar Dec 04 '20 09:12 sezruby