hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Can't redefine array

Open Ytimetravel opened this issue 1 year ago • 4 comments

Dear community, I found an error in using Hudi. If we use the array<struct<>> type in the table.Afterwards, every time I add columns and then write , it throws an exception, The error message is as follows: image Has anyone encountered this issue? How should it be resolved?

Steps to reproduce the behavior: 1.create table CREATE TABLE IF NOT EXISTS db.table ( p_id bigint , record_items array<structname:string,value:int> , edit_items array<structname:string,value:int> , texts array<struct<text_content:string,text_style:string>> , p_date string ) using hudi PARTITIONED BY (p_date) options ( type = 'mor', primaryKey = 'p_id', payloadClass='org.apache.hudi.common.model.PartialUpdateAvroPayload',

'hoodie.bucket.index.num.buckets'='4' ); 2.insert into table db.table select 130928195078 as p_id, array(named_struct("name", "John", "value", 10), named_struct("name", "wangwu", "value", 5), named_struct("name", "Bob", "value", 7)) as record_items, array(named_struct("name", "zhangsan", "value", 10), named_struct("name", "Jane", "value", 5), named_struct("name", "lisi", "value", 7)) as edit_items, null as texts, '20240810' as p_date ; 3.alter table db.table add columns(is_new_col string); 4.insert into table db.table select 130928195078 as p_id, array(named_struct("name", "John", "value", 10), named_struct("name", "wangwu", "value", 5), named_struct("name", "Bob", "value", 7)) as record_items, array(named_struct("name", "zhangsan", "value", 10), named_struct("name", "Jane", "value", 5), named_struct("name", "lisi", "value", 7)) as edit_items, null as texts, 'newcol' as is_new_col, '20240810' as p_date ;

Environment Description

  • Hudi version :0.14.0

  • Spark version :2.4

  • Hadoop version :2.6

  • Storage (HDFS/S3/GCS..) :HDFS

stack org.apache.spark.SparkException: org.apache.hudi.exception.HoodieUpsertException: Error upserting bucketType UPDATE for partition :0 at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:394) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:394) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleInsertPartition(BaseSparkCommitActionExecutor.java:400) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.lambda$mapPartitionsAsRDD$a3ab3c4$2(BaseSparkCommitActionExecutor.java:310) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) at org.apache.spark.api.java.JavaRDDLike$$anonfun$mapPartitionsWithIndex$1.apply(JavaRDDLike.scala:102) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:873) at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsWithIndex$1$$anonfun$apply$25.apply(RDD.scala:873) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:344) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:344) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:357) at org.apache.spark.rdd.RDD$$anonfun$7.apply(RDD.scala:355) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1184) at org.apache.spark.storage.BlockManager$$anonfun$doPutIterator$1.apply(BlockManager.scala:1158) at org.apache.spark.storage.BlockManager.doPut(BlockManager.scala:1093) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1158) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:884) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:355) at org.apache.spark.rdd.RDD.iterator(RDD.scala:306) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:344) at org.apache.spark.rdd.RDD.iterator(RDD.scala:308) at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:95) at org.apache.spark.scheduler.Task.run(Task.scala:124) at org.apache.spark.executor.Executor$TaskRunner$$anonfun$11.apply(Executor.scala:495) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1388) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:501) at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136) at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635) at java.base/java.lang.Thread.run(Thread.java:959) Caused by: org.apache.hudi.exception.HoodieException: org.apache.avro.SchemaParseException: Can't redefine: array at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:149) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:439) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:421) at org.apache.hudi.table.action.deltacommit.BaseSparkDeltaCommitActionExecutor.handleUpdate(BaseSparkDeltaCommitActionExecutor.java:80) at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:387) ... 31 more Caused by: org.apache.avro.SchemaParseException: Can't redefine: array at org.apache.avro.Schema$Names.put(Schema.java:1128) at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:562) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:690) at org.apache.avro.Schema$ArraySchema.toJson(Schema.java:805) at org.apache.avro.Schema$UnionSchema.toJson(Schema.java:882) at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:716) at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:701) at org.apache.avro.Schema.toString(Schema.java:324) at org.apache.avro.Schema.toString(Schema.java:314) at org.apache.parquet.avro.AvroReadSupport.setAvroReadSchema(AvroReadSupport.java:69) at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIteratorInternal(HoodieAvroParquetReader.java:162) at org.apache.hudi.io.storage.HoodieAvroParquetReader.getIndexedRecordIterator(HoodieAvroParquetReader.java:94) at org.apache.hudi.io.storage.HoodieAvroParquetReader.getRecordIterator(HoodieAvroParquetReader.java:73) at org.apache.hudi.table.action.commit.HoodieMergeHelper.runMerge(HoodieMergeHelper.java:126) ... 35 more

Ytimetravel avatar Aug 21 '24 07:08 Ytimetravel

From the source code it looks like there are multiple fields named "array" in the schema, can you share with us the table creation schema after the alter table operation? Here is the logic I found in avro Schema.class:

    public Schema put(Name name, Schema schema) {
      if (containsKey(name))
        throw new SchemaParseException("Can't redefine: " + name);
      return super.put(name, schema);
    }

danny0405 avatar Aug 21 '24 09:08 danny0405

@Ytimetravel Is it possible to upgrade your spark version to 3.2 . This issue may be related to - https://issues.apache.org/jira/browse/PARQUET-1441

ad1happy2go avatar Aug 21 '24 10:08 ad1happy2go

@danny0405 Sorry, I'm not sure if I understand correctly. The following image contains metadata related to the schema after alter column. image

Ytimetravel avatar Aug 21 '24 11:08 Ytimetravel

@ad1happy2go We're currently still using Spark 2 internally. Thank you very much for the link.I'll check if it's relevant.

Ytimetravel avatar Aug 21 '24 11:08 Ytimetravel

@Ytimetravel Any update on this? Did you were able to get pass the issue ?

ad1happy2go avatar Oct 23 '24 08:10 ad1happy2go

Hi @Ytimetravel

Any update on this issue?

rangareddy avatar Feb 12 '25 06:02 rangareddy