connectors
connectors copied to clipboard
Mapping a subset of a delta table's columns as an external Hive table
I've tried using the DeltaStorageHandler
to define an external table in my Hive metastore over an Azure Data Lake Storage Gen2.
From a hive shell:
CREATE EXTERNAL TABLE Raw.MyDeltaTable (MyColumn string)
STORED BY 'io.delta.hive.DeltaStorageHandler'
location 'abfss://[email protected]/myDeltaTable'
TBLPROPERTIES('DO_NOT_UPDATE_STATS'='true');
I am using the maven artifact delta-hive_2.12
version 0.3.0
The delta table actually has many other fields, but I only want to expose this 1 field through the metastore. For parquet, this works as expected. But for Delta I am getting the following error:
FAILED: Execution Error, return code 1 from org.apache.hadoop.hive.ql.exec.DDLTask. MetaException(message:The Delta table schema is not the same as the Hive schema:Specified schema is missing field(s): mySecondColumn, myThirdColumn
This seems like it is purposely not supported, which is odd to me because I thought Delta Lake was all about schema evolution. But it seems that I can't add new columns to my Delta table without breaking my external table in the metastore.
Am I doing something wrong or is this really not supported?
Delta Lake is designed to keep the schema information along with the data in the file system. So the source of truth about the table schema is in tableDir/_delta_log/
not in the Hive metastore service but in the file system. However, Hive only uses the schema in the metastore to plan the queries, and if the schema in the metastore does not match the schema in the file system, its going to cause problems. Furthermore, Hive metastore does not provide good robust APIs for the Delta Hive connector (that is, delta-hive_2.12) to keep the metastore and file system information in sync. This is something pretty hard to deal with, and leads to such schema inconsistency issues when you are trying to set the Hive metastore schema to be different from the actual on-file table schema. @zsxwing will have a bit more idea of whether this can be fixed in this specific case where you are trying to expose a subset of columns.
This works in parquet because using parquet you cannot do any validation whether the schema in the metastore is actually the schema in the data files. So while it allows users to do arbitrary stuff like expose subset of columns, etc. ... it also as easily allows data to corrupted when schema is misconfigured and wrong schema is used. This is what Delta is trying to block with schema validation. And then Delta defines the correctly protocol to evolve that schema (by transactionally updating metadata in the delta log). This all works much more nicely with Delta + Spark processing + Hive metastore. Hive SQL does not provide the necessary configurability to these stuff.
@tdas Does this not mean that a Hive metastore pretty much can't work with schema evolution? How would the flow work?
Let's say a producer X writes to a table Y with 10 columns, and that table is being read by a consumer Z through a Spark SQL query over a Hive metastore table (e.g. SELECT Col1, Col2, ..., Col10 FROM Y).
If X now writes a new column, (Col11) then the underlying file schema would change and Z would immediately break right?
Could you clarify what you mean about the Delta + Spark processing + Hive metastore integration working more nicely?
I would not say it is impossible to make Hive metastore work with schema evolution, but it is indeed hard. There are fundamentally two approaches to evolve schema and work in metastore.
-
Do not keep the schema in the metastore at all - Spark uses this approach. Delta implements a Spark "catalog" that allows Delta to customize the interaction between Spark and Hive metastore in the table resolution phase. Any Delta table created via Spark will not write any schema in the Hive metastore. When querying that table in Spark, using the catalog, the table schema will be dynamically loaded from the log. So there is only one source of truth, the delta log. Any schema changes will be committed to the log, and SQL queries in Spark will always use the latest schema in the log, and complete ignore any information in metastore.
-
Keep the schema in metastore - This is absolutely needed for Hive SQL processing because the schema in the metastore is using by the query planner. Unlike spark's catalog, this is hard to customize. This causes split source of truth situation - for table, the log is the source of truth, but for the SQL query in Hive, the metastore is the source of truth. After any schema changes committed to the log, they will be different. Delta's hive connector could in theory update the hive metastore whenever there is a mismatch. In practice, however, this syncing to the metastore is tricky to do robustly. Our current connector currently does not support it, but with the community's help we could try to built that support.
@remoba Hopefully this clarifies why Delta + Spark running with Hive Metastore is fundamentally more superior to Hive SQL + Hive Metastore.
@zsxwing Please correct me if my understanding of the Hive connector situation is wrong.
@tdas thanks for the explanation.
I am currently using Spark to produce / consume the table, but I am provisioning the metastore schema through a hive shell. Is there an equivalent Hive command that I can run that would create the table in the same way Spark would create the table through the Delta catalog interaction?
I think it's reasonable to create an external table in the metastore with no explicit column definitions (didn't realize this was a possibility). Sounds like I can still get what I want if I additionally create a metastore view to expose the relevant columns.
Edit: Thinking on this again, will this work? If I create a metastore view that will explicitly select named columns from an external table created through delta, could I then query that view using Spark SQL even though the external table would not have any schema info attached?
If I create a metastore view that will explicitly select named columns from an external table created through delta, could I then query that view using Spark SQL even though the external table would not have any schema info attached?
If you are using Spark SQL, it should work. Delta implements Spark SQL API to return the table schema from the transaction log. It doesn't read the table schema from the metastore.
This repo has been deprecated and the code is moved under connectors
module in https://github.com/delta-io/delta repository. Please create the issue in repository https://github.com/delta-io/delta. See delta-io/connectors#556 for details.