hudi
hudi copied to clipboard
The Schema Evolution Not working For Hudi 0.12.3
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
I have a column that is earlier a LONG and that is changed to DOUBLE. The schema evolution is not working. The data is writen but while reading the table the old data file parquet is not able to read throwing ERROR the parquet file not able to read since the column expect double but got INT64
To Reproduce
Steps to reproduce the behavior:
- I have a column
Expected behavior
A clear and concise description of what you expected to happen.
Environment Description
-
Hudi version : 0.12.3
-
Spark version : 3.3
-
Hive version :3
-
Hadoop version :
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : EMR
Additional context
Add any other context about the problem here.
Stacktrace
Add the stacktrace of the error.
@Amar1404 Can you give more details like table/writer configurations you are using? I tried with simple scenario and schema evolution from long to double works fine.
schema1 = StructType(
[
StructField("id", IntegerType(), True),
StructField("value", LongType(), True)
]
)
schema2 = StructType(
[
StructField("id", IntegerType(), True),
StructField("value", DoubleType(), True)
]
)
data1 = [
Row(1, 10000000000),
Row(2, 10000000000),
Row(3,10000000000),
]
data2 = [
Row(1, 100.1),
Row(2, 200.2),
Row(3,10000000000.0),
]
hudi_configs = {
"hoodie.table.name": TABLE_NAME,
"hoodie.datasource.write.precombine.field":"value",
"hoodie.datasource.write.recordkey.field":"id"
}
df = spark.createDataFrame(spark.sparkContext.parallelize(data1), schema1)
df.write.format("org.apache.hudi").options(**hudi_configs).mode("append").save(PATH)
spark.read.format("org.apache.hudi").load(PATH).printSchema()
spark.read.format("org.apache.hudi").load(PATH).show()
df = spark.createDataFrame(spark.sparkContext.parallelize(data2), schema2)
df.write.format("org.apache.hudi").options(**hudi_configs).mode("append").save(PATH)
spark.read.format("org.apache.hudi").load(PATH).printSchema()
spark.read.format("org.apache.hudi").load(PATH).show()
Hi @ad1happy2go - Please find below the configurations
"hoodie.schema.on.read.enable": "true"
"hoodie.cleaner.commits.retained": "3",
"hoodie.datasource.write.reconcile.schema": "true",
"hoodie.parquet.compression.codec": "zstd",
"hoodie.delete.shuffle.parallelism": "200",
"hoodie.parquet.max.file.size": "268435456",
"hoodie.upsert.shuffle.parallelism": "200",
"hoodie.datasource.hive_sync.support_timestamp": "true",
"hoodie.datasource.write.keygenerator.class": "org.apache.hudi.keygen.CustomKeyGenerator",
"hoodie.datasource.write.hive_style_partitioning": "true",
"hoodie.insert.shuffle.parallelism": "200",
"hoodie.parquet.small.file.limit": "134217728",
"hoodie.bootstrap.parallelism": "200",
"hoodie.embed.timeline.server": "true",
"hoodie.bulkinsert.shuffle.parallelism": "200",
"hoodie.datasource.hive_sync.enable": "true",
"hoodie.filesystem.view.type": "EMBEDDED_KV_STORE",
"hoodie.clean.max.commits": "4"
hoodie.metadata.enable: true
spark.hadoop.fs.s3.canned.acl: BucketOwnerFullControl
hoodie.datasource.hive_sync.support_timestamp=true
This is happening when the file is present in another partition or the parquet file is different here in your case since the data is less that there will be one parquet file. Also i am using Hudi DeltaStream for Ingesting the data
Thanks for the details. I will check and try to traige it.
@ad1happy2go - Were you able to recreate the issue? If you want we can have a meeting to create an issue.
@Amar1404 Sorry for the delay here I was OOO. Can you ping me on slack so we can work on this together?
I had a similar question, when table schema is double, and the incoming data schema is long, then why the data can not upsert into table? I think double can handle long from the hudi doc
Below is my code(hudi 0.14.0, spark3.4.1):
from pyspark.sql.types import StructType, StructField, StringType, ArrayType, MapType, DecimalType, LongType, BooleanType, DoubleType, IntegerType
from pyspark.sql import Row
schema1 = StructType(
[
StructField("id", IntegerType(), True),
StructField("value", DoubleType(), True)
]
)
schema2 = StructType(
[
StructField("id", IntegerType(), True),
StructField("value", LongType(), True)
]
)
data1 = [
Row(1, 10000000000.0),
Row(2, 10000000000.0),
Row(3,10000000000.0),
]
data2 = [
Row(1, 100),
Row(2, 200),
Row(3,10000000000),
]
hudi_configs = {
"hoodie.table.name": 'table',
"hoodie.datasource.write.precombine.field":"value",
"hoodie.datasource.write.recordkey.field":"id",
'hoodie.datasource.write.reconcile.schema': 'true',
'hoodie.schema.on.read.enable': 'true',
}
PATH = 'some/path'
df = spark.createDataFrame(spark.sparkContext.parallelize(data1), schema1)
df.write.format("org.apache.hudi").options(**hudi_configs).mode("append").save(PATH)
spark.read.format("org.apache.hudi").load(PATH).printSchema()
spark.read.format("org.apache.hudi").load(PATH).show()
df = spark.createDataFrame(spark.sparkContext.parallelize(data2), schema2)
df.write.format("org.apache.hudi").options(**hudi_configs).mode("append").save(PATH)
spark.read.format("org.apache.hudi").load(PATH).printSchema()
spark.read.format("org.apache.hudi").load(PATH).show()
the stack trace is:
IllegalArgumentException: cannot update origin type: double to a incompatibility type: long
@ad1happy2go could you take a look at this, thanks!
@lei-su-awx I tried this code with 0.14.1 and it worked fine. With 0.14.0 I can see the error.
@lei-su-awx @Amar1404 Can you guys try with 0.14.1 and let me know in case this issue persists.
Hi @ad1happy2go 0.14.1 worked fine. Thanks
hi @ad1happy2go - In my case the table is in long and changed to double.