hudi
hudi copied to clipboard
[SUPPORT] Deltastreamer updates not supporting the addition of new columns
Describe the problem you faced
Currently using the delatstreamer to ingested from one S3 bucket to another. In Hudi v10 I would use the upsert operation in the delatstreamer. When a new column was added to the schema the target table would reflect that.
However in Hudi 0.11.1 using the insert operation, schema changes are not reflected in the target table - specifically the addition of nullable columns. Other important notes, I added the metadata table and column stat indexes.
To Reproduce
Steps to reproduce the behavior:
- Start the deltastreamer using the script below
- Add a new nullable column
- Query from the target table for the new column
spark-submit \
--jars opt/spark/jars/hudi-utilities-bundle.jar,/opt/spark/jars/hadoop-aws.jar,/opt/spark/jars/aws-java-sdk.jar \
--master spark://spark-master:7077 \
--total-executor-cores 20 \
--executor-memory 4g \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-table per_tick_stats \
--table-type COPY_ON_WRITE \
--min-sync-interval-seconds 30 \
--source-limit 250000000 \
--continuous \
--source-ordering-field $3 \
--target-base-path $2 \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=$1 \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--hoodie-conf hoodie.datasource.write.recordkey.field=$4 \
--hoodie-conf hoodie.datasource.write.precombine.field=$3 \
--hoodie-conf hoodie.clustering.plan.strategy.sort.columns=$5 \
--hoodie-conf hoodie.datasource.write.partitionpath.field=$6 \
--hoodie-conf hoodie.clustering.inline=true \
--hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=100000000 \
--hoodie-conf hoodie.clustering.inline.max.commits=4 \
--hoodie-conf hoodie.metadata.enable=true \
--hoodie-conf hoodie.metadata.index.column.stats.enable=true \
--op INSERT
./deltastreamer.sh s3a://simian-example-prod-output/stats/ingesting s3a://simian-example-prod-output/stats/querying STATOVYGIYLUMVSF6YLU STATONUW25LMMF2GS33OL5ZHK3S7NFSA____,STATONUW2X3UNFWWK___ STATONUW25LMMF2GS33OL5ZHK3S7NFSA____,STATMJQXIY3IL5ZHK3S7NFSA____
Expected behavior
New nullable column should be present in the target table
Environment Description
-
Hudi version : 0.11.1
-
Spark version : 3.1.2
-
Hive version : 3.2.0
-
Storage (HDFS/S3/GCS..) : S3
-
Running on Docker? (yes/no) : yes
Additional context
Initially used upsert but was unable to continue using it because of this issue: https://github.com/apache/hudi/issues/6015 Stacktrace
Add the stacktrace of the error.
Unable to reproduce on a brand new table. Same script, same environment but new table causes no issues.
Not sure if this has any problems due to upgrading a previously existing Hudi table.
Also wanted note that in the logs I am seeing
22/08/09 05:13:37 INFO DeltaSync: Seeing new schema. Source :{
This has the correct schema, for both the source and target. However the output data isn't matching the schema.
stat_data_frame = (session.read.format("hudi").option("hoodie.datasource.write.reconcile.schema", "true").load("s3a://example-prod-output/stats/querying/0e6a3669-1f94-4ec4-93e8-6b5b25053b7e-0_0-70-1046_20220809052311671.parquet"))
len(stat_data_frame.columns) # returns 616
however
stat_data_frame = (session.read.format("hudi").option("hoodie.datasource.write.reconcile.schema", "true").load("s3a://example-prod-output/stats/querying/*"))
len(stat_data_frame.columns) # returns 551
This issue was resolved by instead remove asterisk:
stat_data_frame = (session.read.format("hudi").option("hoodie.datasource.write.reconcile.schema", "true").load("s3a://example-prod-output/stats/querying"))
After discussion with @rohit-m-99 , the problem is not due to the write side, but rather how the Hudi table is read. The glob pattern, i.e., spark.read.format("hudi").load("<base_path>/*"), is used to read the table, which causes inconsistent results. Using spark.read.format("hudi").load("<base_path>") solves the problem and returns the correct data.
@rohit-m-99 feel free to close the issue if you are all good.
Running into an issue now with .option("hoodie.metadata.enable", "true"). When doing so I receive the following issue alongside a FileNotFound:
It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.
When creating the dataframe I see the potentially related warning:
Metadata record for . encountered some files to be deleted which was not added before. Ignoring the spurious deletes as the _hoodie.metadata.ignore.spurious.deletes config is set to true
Querying without the metadata table enabled causes no issues but enabling it seems to cause this issue. Note this happens only when not using the glob patter discussed above
@rohit-m-99 Is this from Spark SQL? Do you see any exceptions or stacktrace? A few things to try out: (1) restart spark-shell or spark-sql to see if this goes away; (2) set hoodie.metadata.enable to false in Deltastreamer and wait for a few commits so that metadata table is deleted completely (no .hoodie/metadata folder), and then re-enable the metadata table.
Option 2 worked for me! Set hoodie.metadata.enable to false in Deltastreamer and wait for a few commits so that metadata table is deleted completely (no .hoodie/metadata folder), and then re-enable the metadata table.