hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Hudi spark datasource error after migrate from 0.8 to 0.11

Open kk17 opened this issue 2 years ago • 17 comments

Describe the problem you faced

after I update hudi to 0.11 from 0.8, using spark.table(fullTableName) to read a hudi table is not working, the table has been sync to hive metastore and spark is connected to the metastore. the error is

org.sparkproject.guava.util.concurrent.UncheckedExecutionException: org.apache.hudi.exception.HoodieException: 'path' or 'Key: 'hoodie.datasource.read.paths' , default: null description: Comma separated list of file paths to read within a Hudi table. since version: version is not defined deprecated after: version is not defined)' or both must be specified.
at org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2263)
at org.sparkproject.guava.cache.LocalCache.get(LocalCache.java:4000)
at org.sparkproject.guava.cache.LocalCache$LocalManualCache.get(LocalCache.java:4789)
at org.apache.spark.sql.catalyst.catalog.SessionCatalog.

...

Caused by: org.apache.hudi.exception.HoodieException: 'path' or 'Key: 'hoodie.datasource.read.paths' , default: null description: Comma separated list of file paths to read within a Hudi table. since version: version is not defined deprecated after: version is not defined)' or both must be specified.
	at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:78)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:353)
	at org.apache.spark.sql.execution.datasources.FindDataSourceTable.$anonfun$readDataSourceTable$1(DataSourceStrategy.scala:261)
	at org.sparkproject.guava.cache.LocalCache$LocalManualCache$1.load(LocalCache.java:4792)
	at org.sparkproject.guava.cache.LocalCache$LoadingValueReference.loadFuture(LocalCache.java:3599)
	at org.sparkproject.guava.cache.LocalCache$Segment.loadSync(LocalCache.java:2379)
	at org.sparkproject.guava.cache.LocalCache$Segment.lockedGetOrLoad(LocalCache.java:2342)
	at org.sparkproject.guava.cache.LocalCache$Segment.get(LocalCache.java:2257)

To Reproduce

Steps to reproduce the behavior:

  1. using hudi 0.8 to create a hudi table and sync to hive metastore using hive jdbc sync mode
  2. update hudi to 0.11
  3. add a new column to the table and sync to hive metastore using hive jdbc sync mode
  4. read the table using spark.table

Expected behavior

reading the table should be ok.

Environment Description

  • Hudi version : 0.11

  • Spark version : 3.1.2

  • Hive version : 3.1.2

  • Hadoop version : 3.1.2

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : no

Additional context

we are using hive jdbc sync mode to sync hudi table to hive metastore. before we upgrade hudi to 0.11, we will get error for show create table command. after we upgrade hudi to 0.11, we add one new column to the table. the error happen after we add the new column. I run show create table using spark-sql after the error, the command run successful, but the return create table statement is without a location. I also run hive sql, both show create table and select statement is ok.

here are more information. we are using hive jdbc sync mode to sync hudi table to hive metastore. before we upgrade hudi to 0.11, we will get error for show create table command. after we upgrade hudi to 0.11, we add one new column to the table. the error happen after we add the new column. I run show create table using spark-sql after the error, the command run successful, but the return create table statement is without a location. I also run hive sql, both show create table and select statement is ok.

after I drop the hive table and rerun hive sync, it is ok

before hive sync rerun

spark-sql> show create table ods.track_signup;
CREATE TABLE `ods`.`track_signup` (
  `_hoodie_commit_time` STRING,
  `_hoodie_commit_seqno` STRING,
  `_hoodie_record_key` STRING,
  `_hoodie_partition_path` STRING,
  `_hoodie_file_name` STRING,
  `act` STRING,
  `time` BIGINT,
  `env` STRING,
  `id` STRING,
  `seer_time` STRING,
  `hh` STRING,
  `app_id` INT,
  `ip` STRING,
  `g` STRING,
  `u` STRING,
  `ga_id` STRING,
  `app_version` STRING,
  `platform` STRING,
  `url` STRING,
  `referer` STRING,
  `medium` STRING,
  `source` STRING,
  `campaign` STRING,
  `stage` STRING,
  `content` STRING,
  `term` STRING,
  `lang` STRING,
  `su` STRING,
  `campaign_track_id` STRING,
  `last_component_id` STRING,
  `regSourceId` STRING,
  `dt` STRING)
USING hudi
PARTITIONED BY (dt)
TBLPROPERTIES (
  'bucketing_version' = '2',
  'last_modified_time' = '1655107146',
  'last_modified_by' = 'hive',
  'last_commit_time_sync' = '20220613152622014')

after hive sync rerun

spark-sql> show create table ods.track_signup;
CREATE TABLE `ods`.`track_signup` (
  `_hoodie_commit_time` STRING,
  `_hoodie_commit_seqno` STRING,
  `_hoodie_record_key` STRING,
  `_hoodie_partition_path` STRING,
  `_hoodie_file_name` STRING,
  `act` STRING COMMENT 'xxx',
  `time` BIGINT COMMENT 'xxx',
  `env` STRING COMMENT 'xxx',
  `id` STRING COMMENT 'xxx',
  `seer_time` STRING COMMENT 'xxx',
  `hh` STRING,
  `app_id` INT COMMENT 'xxx',
  `ip` STRING COMMENT 'xxx',
  `g` STRING COMMENT 'xxx',
  `u` STRING COMMENT 'xxx',
  `ga_id` STRING COMMENT 'xxx',
  `app_version` STRING COMMENT 'xxx',
  `platform` STRING COMMENT 'xxx',
  `url` STRING COMMENT 'xxx',
  `referer` STRING COMMENT 'xxx',
  `medium` STRING COMMENT 'xxx',
  `source` STRING COMMENT 'xxx',
  `campaign` STRING COMMENT 'xxx',
  `stage` STRING COMMENT 'xxx',
  `content` STRING COMMENT 'xxx',
  `term` STRING COMMENT 'xxx',
  `lang` STRING COMMENT 'xxx',
  `su` STRING COMMENT 'xxx',
  `campaign_track_id` STRING COMMENT 'xxx',
  `last_component_id` STRING COMMENT 'xxx',
  `regSourceId` STRING,
  `dt` STRING)
USING hudi
OPTIONS (
  `hoodie.query.as.ro.table` 'false')
PARTITIONED BY (dt)
LOCATION 's3://xxxx/track_signup'
TBLPROPERTIES (
  'bucketing_version' = '2',
  'last_modified_time' = '1655134599',
  'last_modified_by' = 'hive',
  'last_commit_time_sync' = '20220613153932664')

kk17 avatar Jun 14 '22 06:06 kk17

Steps to reproduce the behavior:

1. using hudi 0.8 to create a hudi table and sync to hive metastore using hive jdbc sync mode
2. update hudi to 0.11
3. add a new column to the table and sync to hive metastore using hive jdbc sync mode
4. read the table using spark.table

@kk17 After step 2, when you run show create table then does it show the LOCATION? Also, could you try HMS sync mode? https://hudi.apache.org/docs/syncing_metastore#sync-modes

codope avatar Jun 14 '22 14:06 codope

This is how spark-sql output in step 2

spark-sql>  show create table ods.track_signup;Error in query: Failed to execute SHOW CREATE TABLE against table `ods`.`track_signup `, which is created by Hive and uses the following unsupported serde configuration
 SERDE: org.apache.hadoop.hive.ql.io.parquet.serde.ParquetHiveSerDe INPUTFORMAT: org.apache.hudi.hadoop.HoodieParquetInputFormat OUTPUTFORMAT: org.apache.hadoop.hive.ql.io.parquet.MapredParquetOutputFormat

Hive SQL is ok.

kk17 avatar Jun 15 '22 04:06 kk17

Another problem is the table in spark SQL is a system table, but in the hive is an external table.

kk17 avatar Jun 15 '22 04:06 kk17

@leesf @XuQianJin-Stars Can you please look into this issue? The gist is that spark-sql does not recognise the table as an external table and hence the path is not passed in DefaultSource.

codope avatar Jun 15 '22 10:06 codope

also have the same error when using spark SQL to create a hudi table.

create table if not exists hudi_table5(
  id int, 
  name string, 
  price double
) using hudi
options (
  type = 'cow'
);

spark SQL error:

22/06/20 10:08:06 ERROR SparkSQLDriver: Failed in [create table if not exists hudi_table5(
  id int, 
  name string, 
  price double
) using hudi
options (
  type = 'cow'
)]
org.apache.hudi.exception.HoodieException: 'path' or 'Key: 'hoodie.datasource.read.paths' , default: null description: Comma separated list of file paths to read within a Hudi table. since version: version is not defined deprecated after: version is not defined)' or both must be specified.
        at org.apache.hudi.DefaultSource.createRelation(DefaultSource.scala:78)
        at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:353)
        at org.apache.spark.sql.execution.command.CreateDataSourceTableCommand.run(createDataSourceTables.scala:78)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
        at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
        at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
        at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
        at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
        at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
        at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
        at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
        at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
        at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
        at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
        at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
        at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:381)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:500)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:494)
        at scala.collection.Iterator.foreach(Iterator.scala:941)
        at scala.collection.Iterator.foreach$(Iterator.scala:941)
        at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
        at scala.collection.IterableLike.foreach(IterableLike.scala:74)
        at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
        at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:494)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:284)
        at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
        at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
        at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
        at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
        at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
        at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
        at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
        at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

kk17 avatar Jun 20 '22 10:06 kk17

I found out the issue is caused by after changing the table to spark data source table, the table SerDeInfo is missing. I creates a pull request.

kk17 avatar Jun 21 '22 07:06 kk17

oh nice. I think I am also affected.

WangCHX avatar Jun 22 '22 00:06 WangCHX

@WangCHX @kk17 Have you add

#spark2.4/spark3.1
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension'

#spark3.2
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog'

when you creating hudi tables using spark sql?

boneanxs avatar Jun 22 '22 02:06 boneanxs

@boneanxs no. by after i added, got another error:

22/06/22 03:02:41 ERROR SparkSQLDriver: Failed in [create table if not exists hudi_table2(
  id int,
  name string,
  price double
) using hudi
options (
  type = 'cow'
)]
java.lang.IllegalArgumentException: Can't find primaryKey `uuid` in root
 |-- _hoodie_commit_time: string (nullable = true)
 |-- _hoodie_commit_seqno: string (nullable = true)
 |-- _hoodie_record_key: string (nullable = true)
 |-- _hoodie_partition_path: string (nullable = true)
 |-- _hoodie_file_name: string (nullable = true)
 |-- id: integer (nullable = true)
 |-- name: string (nullable = true)
 |-- price: double (nullable = true)
.
	at org.apache.hudi.common.util.ValidationUtils.checkArgument(ValidationUtils.java:40)
	at org.apache.spark.sql.hudi.HoodieOptionConfig$.$anonfun$validateTable$3(HoodieOptionConfig.scala:202)
	at org.apache.spark.sql.hudi.HoodieOptionConfig$.$anonfun$validateTable$3$adapted(HoodieOptionConfig.scala:200)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
	at org.apache.spark.sql.hudi.HoodieOptionConfig$.validateTable(HoodieOptionConfig.scala:200)
	at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.parseSchemaAndConfigs(HoodieCatalogTable.scala:244)
	at org.apache.spark.sql.catalyst.catalog.HoodieCatalogTable.initHoodieTable(HoodieCatalogTable.scala:174)
	at org.apache.spark.sql.hudi.command.CreateHoodieTableCommand.run(CreateHoodieTableCommand.scala:65)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult$lzycompute(commands.scala:70)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.sideEffectResult(commands.scala:68)
	at org.apache.spark.sql.execution.command.ExecutedCommandExec.executeCollect(commands.scala:79)
	at org.apache.spark.sql.Dataset.$anonfun$logicalPlan$1(Dataset.scala:228)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:3687)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$5(SQLExecution.scala:103)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:163)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:90)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:3685)
	at org.apache.spark.sql.Dataset.<init>(Dataset.scala:228)
	at org.apache.spark.sql.Dataset$.$anonfun$ofRows$2(Dataset.scala:99)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:96)
	at org.apache.spark.sql.SparkSession.$anonfun$sql$1(SparkSession.scala:618)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:775)
	at org.apache.spark.sql.SparkSession.sql(SparkSession.scala:613)
	at org.apache.spark.sql.SQLContext.sql(SQLContext.scala:650)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:67)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:381)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:500)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:494)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:494)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:284)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:951)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:180)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:203)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1039)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1048)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)

kk17 avatar Jun 22 '22 03:06 kk17

i did try add that. same issue though.

WangCHX avatar Jun 22 '22 03:06 WangCHX

The primarykey must be specified when you creating hudi table, by default it is uuid, this error means the table has no column names uuid, you can change the sql to

create table if not exists hudi_table2(
  id int, 
  name string, 
  price double
) using hudi
options (
  primaryKey = 'id',
  type = 'cow'
);

boneanxs avatar Jun 22 '22 03:06 boneanxs

ok now. this document https://hudi.apache.org/docs/table_management/ need to be updated.

kk17 avatar Jun 22 '22 04:06 kk17

@boneanxs will --conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' fix the issue when inserting into existing table? creating a new table maybe hard for me.

WangCHX avatar Jun 22 '22 04:06 WangCHX

we are using DeltaStreamer to create the table, do we also need to add that configuration?

WangCHX avatar Jun 22 '22 06:06 WangCHX

@WangCHX If you use DeltaStreamer to ingest data to hudi with hive sync enabled, you don't need to set this configuration. can you fire a new issue with details how to reproduce it so can track it?

boneanxs avatar Jun 22 '22 09:06 boneanxs

Is there any follow up here. or is something waiting for any assistance.

nsivabalan avatar Aug 10 '22 22:08 nsivabalan

The pull request is waiting for merge. I need some time and guide to write a proper unit test.

kk17 avatar Aug 11 '22 01:08 kk17

@XuQianJin-Stars @minihippo : Can either of you folks assist @kk17 please.

nsivabalan avatar Aug 12 '22 02:08 nsivabalan

@kk17 Can you provide the pr and we can help u in accelerating the pr landing

minihippo avatar Aug 18 '22 16:08 minihippo

@kk17 Can you provide the pr and we can help u in accelerating the pr landing

here is the pr: https://github.com/apache/hudi/pull/5920

kk17 avatar Aug 19 '22 01:08 kk17

closing the github issue as we have a fix. thanks folks!

nsivabalan avatar Aug 28 '22 02:08 nsivabalan