hudi icon indicating copy to clipboard operation
hudi copied to clipboard

The Schema Evolution Not working For Hudi 0.12.3

Open Amar1404 opened this issue 1 year ago • 13 comments

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

I have a column that is earlier a LONG and that is changed to DOUBLE. The schema evolution is not working. The data is writen but while reading the table the old data file parquet is not able to read throwing ERROR the parquet file not able to read since the column expect double but got INT64

To Reproduce

Steps to reproduce the behavior:

  1. I have a column

Expected behavior

A clear and concise description of what you expected to happen.

Environment Description

  • Hudi version : 0.12.3

  • Spark version : 3.3

  • Hive version :3

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : EMR

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

Amar1404 avatar Dec 12 '23 05:12 Amar1404

@Amar1404 Can you give more details like table/writer configurations you are using? I tried with simple scenario and schema evolution from long to double works fine.

schema1 = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("value", LongType(), True)
    ]
)

schema2 = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("value", DoubleType(), True)
    ]
)

data1 = [
    Row(1, 10000000000),
    Row(2, 10000000000),
    Row(3,10000000000),
]

data2 = [
    Row(1, 100.1),
    Row(2, 200.2),
    Row(3,10000000000.0),
]


hudi_configs = {
    "hoodie.table.name": TABLE_NAME,
    "hoodie.datasource.write.precombine.field":"value",
    "hoodie.datasource.write.recordkey.field":"id"
}

df = spark.createDataFrame(spark.sparkContext.parallelize(data1), schema1)
df.write.format("org.apache.hudi").options(**hudi_configs).mode("append").save(PATH)
spark.read.format("org.apache.hudi").load(PATH).printSchema()
spark.read.format("org.apache.hudi").load(PATH).show()
df = spark.createDataFrame(spark.sparkContext.parallelize(data2), schema2)
df.write.format("org.apache.hudi").options(**hudi_configs).mode("append").save(PATH)
spark.read.format("org.apache.hudi").load(PATH).printSchema()
spark.read.format("org.apache.hudi").load(PATH).show()

ad1happy2go avatar Dec 12 '23 06:12 ad1happy2go

Hi @ad1happy2go - Please find below the configurations

  "hoodie.schema.on.read.enable": "true"
  "hoodie.cleaner.commits.retained": "3",
  "hoodie.datasource.write.reconcile.schema": "true",
  "hoodie.parquet.compression.codec": "zstd",
  "hoodie.delete.shuffle.parallelism": "200",
  "hoodie.parquet.max.file.size": "268435456",
  "hoodie.upsert.shuffle.parallelism": "200",
  "hoodie.datasource.hive_sync.support_timestamp": "true",
  "hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.CustomKeyGenerator",
  "hoodie.datasource.write.hive_style_partitioning": "true",
  "hoodie.insert.shuffle.parallelism": "200",
  "hoodie.parquet.small.file.limit": "134217728",
  "hoodie.bootstrap.parallelism": "200",
  "hoodie.embed.timeline.server": "true",
  "hoodie.bulkinsert.shuffle.parallelism": "200",
  "hoodie.datasource.hive_sync.enable": "true",
  "hoodie.filesystem.view.type": "EMBEDDED_KV_STORE",
  "hoodie.clean.max.commits": "4"
  hoodie.metadata.enable: true
  spark.hadoop.fs.s3.canned.acl: BucketOwnerFullControl
  hoodie.datasource.hive_sync.support_timestamp=true
  
  
  
  This is happening when the file is present in another partition or the parquet file is different here in your case since the data is less that there will be one parquet file. Also i am using Hudi DeltaStream for Ingesting the data

Amar1404 avatar Dec 18 '23 04:12 Amar1404

Thanks for the details. I will check and try to traige it.

ad1happy2go avatar Dec 18 '23 08:12 ad1happy2go

@ad1happy2go - Were you able to recreate the issue? If you want we can have a meeting to create an issue.

Amar1404 avatar Dec 26 '23 04:12 Amar1404

@Amar1404 Sorry for the delay here I was OOO. Can you ping me on slack so we can work on this together?

ad1happy2go avatar Dec 29 '23 09:12 ad1happy2go

I had a similar question, when table schema is double, and the incoming data schema is long, then why the data can not upsert into table? I think double can handle long from the hudi doc image

Below is my code(hudi 0.14.0, spark3.4.1):

from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType, DecimalType, LongType, BooleanType, DoubleType, IntegerType
from pyspark.sql import Row
schema1 = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("value", DoubleType(), True)
    ]
)

schema2 = StructType(
    [
        StructField("id", IntegerType(), True),
        StructField("value", LongType(), True)
    ]
)

data1 = [
    Row(1, 10000000000.0),
    Row(2, 10000000000.0),
    Row(3,10000000000.0),
]

data2 = [
    Row(1, 100),
    Row(2, 200),
    Row(3,10000000000),
]


hudi_configs = {
    "hoodie.table.name": 'table',
    "hoodie.datasource.write.precombine.field":"value",
    "hoodie.datasource.write.recordkey.field":"id",
    'hoodie.datasource.write.reconcile.schema': 'true',
    'hoodie.schema.on.read.enable': 'true',
}

PATH = 'some/path'

df = spark.createDataFrame(spark.sparkContext.parallelize(data1), schema1)
df.write.format("org.apache.hudi").options(**hudi_configs).mode("append").save(PATH)
spark.read.format("org.apache.hudi").load(PATH).printSchema()
spark.read.format("org.apache.hudi").load(PATH).show()
df = spark.createDataFrame(spark.sparkContext.parallelize(data2), schema2)
df.write.format("org.apache.hudi").options(**hudi_configs).mode("append").save(PATH)
spark.read.format("org.apache.hudi").load(PATH).printSchema()
spark.read.format("org.apache.hudi").load(PATH).show()

the stack trace is: IllegalArgumentException: cannot update origin type: double to a incompatibility type: long

@ad1happy2go could you take a look at this, thanks!

lei-su-awx avatar Jan 31 '24 08:01 lei-su-awx

@lei-su-awx I tried this code with 0.14.1 and it worked fine. With 0.14.0 I can see the error.

@lei-su-awx @Amar1404 Can you guys try with 0.14.1 and let me know in case this issue persists.

ad1happy2go avatar Jan 31 '24 10:01 ad1happy2go

Hi @ad1happy2go 0.14.1 worked fine. Thanks

lei-su-awx avatar Feb 02 '24 03:02 lei-su-awx

hi @ad1happy2go - In my case the table is in long and changed to double.

Amar1404 avatar Feb 05 '24 13:02 Amar1404