hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Deltastreamer updates not supporting the addition of new columns

Open rohitmittapalli opened this issue 3 years ago • 8 comments

Describe the problem you faced

Currently using the delatstreamer to ingested from one S3 bucket to another. In Hudi v10 I would use the upsert operation in the delatstreamer. When a new column was added to the schema the target table would reflect that.

However in Hudi 0.11.1 using the insert operation, schema changes are not reflected in the target table - specifically the addition of nullable columns. Other important notes, I added the metadata table and column stat indexes.

To Reproduce

Steps to reproduce the behavior:

  1. Start the deltastreamer using the script below
  2. Add a new nullable column
  3. Query from the target table for the new column
spark-submit \
--jars opt/spark/jars/hudi-utilities-bundle.jar,/opt/spark/jars/hadoop-aws.jar,/opt/spark/jars/aws-java-sdk.jar \
--master spark://spark-master:7077 \
--total-executor-cores 20 \
--executor-memory 4g \
--conf spark.hadoop.fs.s3a.impl=org.apache.hadoop.fs.s3a.S3AFileSystem \
--class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer opt/spark/jars/hudi-utilities-bundle.jar \
--source-class org.apache.hudi.utilities.sources.ParquetDFSSource \
--target-table per_tick_stats \
--table-type COPY_ON_WRITE \
--min-sync-interval-seconds 30 \
--source-limit 250000000 \
--continuous \
--source-ordering-field $3 \
--target-base-path $2 \
--hoodie-conf hoodie.deltastreamer.source.dfs.root=$1 \
--hoodie-conf hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.ComplexKeyGenerator \
--hoodie-conf hoodie.datasource.write.recordkey.field=$4 \
--hoodie-conf hoodie.datasource.write.precombine.field=$3 \
--hoodie-conf hoodie.clustering.plan.strategy.sort.columns=$5 \
--hoodie-conf hoodie.datasource.write.partitionpath.field=$6 \
--hoodie-conf hoodie.clustering.inline=true \
--hoodie-conf hoodie.clustering.plan.strategy.small.file.limit=100000000 \
--hoodie-conf hoodie.clustering.inline.max.commits=4 \
--hoodie-conf hoodie.metadata.enable=true \
--hoodie-conf hoodie.metadata.index.column.stats.enable=true \
--op INSERT
./deltastreamer.sh s3a://simian-example-prod-output/stats/ingesting s3a://simian-example-prod-output/stats/querying STATOVYGIYLUMVSF6YLU STATONUW25LMMF2GS33OL5ZHK3S7NFSA____,STATONUW2X3UNFWWK___ STATONUW25LMMF2GS33OL5ZHK3S7NFSA____,STATMJQXIY3IL5ZHK3S7NFSA____

Expected behavior

New nullable column should be present in the target table

Environment Description

  • Hudi version : 0.11.1

  • Spark version : 3.1.2

  • Hive version : 3.2.0

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : yes

Additional context

Initially used upsert but was unable to continue using it because of this issue: https://github.com/apache/hudi/issues/6015 Stacktrace

Add the stacktrace of the error.

rohitmittapalli avatar Aug 08 '22 21:08 rohitmittapalli

Unable to reproduce on a brand new table. Same script, same environment but new table causes no issues.

Not sure if this has any problems due to upgrading a previously existing Hudi table.

rohitmittapalli avatar Aug 09 '22 01:08 rohitmittapalli

Also wanted note that in the logs I am seeing

22/08/09 05:13:37 INFO DeltaSync: Seeing new schema. Source :{ This has the correct schema, for both the source and target. However the output data isn't matching the schema.

rohitmittapalli avatar Aug 09 '22 05:08 rohitmittapalli

stat_data_frame = (session.read.format("hudi").option("hoodie.datasource.write.reconcile.schema", "true").load("s3a://example-prod-output/stats/querying/0e6a3669-1f94-4ec4-93e8-6b5b25053b7e-0_0-70-1046_20220809052311671.parquet"))
len(stat_data_frame.columns) # returns 616

however

stat_data_frame = (session.read.format("hudi").option("hoodie.datasource.write.reconcile.schema", "true").load("s3a://example-prod-output/stats/querying/*"))
len(stat_data_frame.columns) # returns 551

rohitmittapalli avatar Aug 09 '22 05:08 rohitmittapalli

This issue was resolved by instead remove asterisk:

stat_data_frame = (session.read.format("hudi").option("hoodie.datasource.write.reconcile.schema", "true").load("s3a://example-prod-output/stats/querying"))

rohitmittapalli avatar Aug 09 '22 05:08 rohitmittapalli

After discussion with @rohit-m-99 , the problem is not due to the write side, but rather how the Hudi table is read. The glob pattern, i.e., spark.read.format("hudi").load("<base_path>/*"), is used to read the table, which causes inconsistent results. Using spark.read.format("hudi").load("<base_path>") solves the problem and returns the correct data.

yihua avatar Aug 09 '22 05:08 yihua

@rohit-m-99 feel free to close the issue if you are all good.

yihua avatar Aug 09 '22 05:08 yihua

Running into an issue now with .option("hoodie.metadata.enable", "true"). When doing so I receive the following issue alongside a FileNotFound:

It is possible the underlying files have been updated. You can explicitly invalidate the cache in Spark by running 'REFRESH TABLE tableName' command in SQL or by recreating the Dataset/DataFrame involved.

When creating the dataframe I see the potentially related warning:

Metadata record for . encountered some files to be deleted which was not added before. Ignoring the spurious deletes as the _hoodie.metadata.ignore.spurious.deletes config is set to true

Querying without the metadata table enabled causes no issues but enabling it seems to cause this issue. Note this happens only when not using the glob patter discussed above

rohitmittapalli avatar Aug 10 '22 00:08 rohitmittapalli

@rohit-m-99 Is this from Spark SQL? Do you see any exceptions or stacktrace? A few things to try out: (1) restart spark-shell or spark-sql to see if this goes away; (2) set hoodie.metadata.enable to false in Deltastreamer and wait for a few commits so that metadata table is deleted completely (no .hoodie/metadata folder), and then re-enable the metadata table.

yihua avatar Aug 11 '22 05:08 yihua

Option 2 worked for me! Set hoodie.metadata.enable to false in Deltastreamer and wait for a few commits so that metadata table is deleted completely (no .hoodie/metadata folder), and then re-enable the metadata table.

rohitmittapalli avatar Aug 18 '22 01:08 rohitmittapalli