hudi icon indicating copy to clipboard operation
hudi copied to clipboard

SchemaEvolution : Default value not getting fetched properly for not null columns from confluent kafka schema registry

Open Shubham21k opened this issue 1 year ago • 6 comments

Describe the problem you faced

We have an ingestion pipeline which ingests data from postgres to hive glue table. Currently, debezium is sending messages to confluent kafka by capturing postgres changelog events. We are reading these kafka messages with hudi deltastreamer job for creating glue table.

On adding a new not null column with a certain default value, This default value is coming properly in the confluent schema registry. but hudi is not able to retrieve this from the schema registry (refer to stacktrace).

Is there any way to support these kind of schema changes using some configuration?

To Reproduce

Steps to reproduce the behavior:

  1. create a postgres table and setup debezium connector, kafka topic and confluent schema registry for the same.
  2. add not null column to this table:
ALTER TABLE test ADD COLUMN ten boolean DEFAULT true NOT NULL

table schema : Screenshot 2022-09-06 at 4 38 22 PM 5. insert few records into the table 6. Run deltastreamer job with table type as COW (copy on write):

spark-submit --master yarn --jars /usr/lib/spark/external/lib/spark-avro.jar,s3://test/jars/hudi-utilities-bundle_2.12-0.10.0.jar --class org.apache.hudi.utilities.deltastreamer.HoodieDeltaStreamer --conf spark.executor.cores=2 --conf spark.driver.memory=4g --conf spark.driver.memoryOverhead=800m --conf spark.executor.memoryOverhead=1800m --conf spark.executor.memory=8g --conf spark.dynamicAllocation.enabled=true --conf spark.dynamicAllocation.initialExecutors=1 --conf spark.dynamicAllocation.minExecutors=1 --conf spark.dynamicAllocation.maxExecutors=16 --conf spark.scheduler.mode=FAIR --conf spark.task.maxFailures=5 --conf spark.rdd.compress=true --conf spark.serializer=org.apache.spark.serializer.KryoSerializer --conf spark.shuffle.service.enabled=true --conf spark.sql.hive.convertMetastoreParquet=false --conf spark.yarn.max.executor.failures=5 --conf spark.sql.catalogImplementation=hive --conf spark.driver.userClassPathFirst=true --conf spark.executor.userClassPathFirst=true --deploy-mode client --enable-sync --hoodie-conf hoodie.deltastreamer.source.kafka.auto.reset.offsets=earliest --hoodie-conf hoodie.parquet.compression.codec=snappy --hoodie-conf partition.assignment.strategy=org.apache.kafka.clients.consumer.RangeAssignor --hoodie-conf hive.metastore.disallow.incompatible.col.type.changes=false --hoodie-conf hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable=false --hoodie-conf auto.offset.reset=earliest --table-type COPY_ON_WRITE --source-class com.test.sources.ConfluentAvroKafkaSource --schemaprovider-class org.apache.hudi.utilities.schema.NullTargetSchemaRegistryProvider --props s3://test/config/kafka-source.properties --source-limit 1000000 --hoodie-conf hoodie.deltastreamer.schemaprovider.registry.url=https://test-schema-registry.np.tech.in/subjects/test_service.public.test-value/versions/latest --hoodie-conf hoodie.datasource.hive_sync.database=test --hoodie-conf hoodie.datasource.hive_sync.table=test  --hoodie-conf hoodie.datasource.write.recordkey.field=id --hoodie-conf hoodie.datasource.write.precombine.field=__lsn --hoodie-conf hoodie.deltastreamer.source.kafka.topic=testdb.test --hoodie-conf group.id=delta-streamer-test_service-test --source-ordering-field __lsn --target-base-path s3://test/raw-data/test_service/test/ --target-table test  --hoodie-conf hoodie.bloom.index.update.partition.path=false --hoodie-conf hoodie.metrics.on=false --hoodie-conf hoodie.metrics.reporter.type=PROMETHEUS_PUSHGATEWAY hoodie.datasource.write.keygenerator.class=org.apache.hudi.keygen.NonpartitionedKeyGenerator --hoodie-conf hoodie.datasource.hive_sync.partition_extractor_class=org.apache.hudi.hive.NonPartitionedExtractor

Expected behavior

Default should be retrieved properly by hudi.

Environment Description

  • Confluent Kafka version : 6.2.1

  • Hudi version : 2.12-0.10.0

  • Spark version : 3.0.1

  • Hive version : 3.1.2

  • Hadoop version : Amazon 3.2.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : N

Expected behavior

default value should be properly retrieved by hudi from schema registry.

Environment Description

  • Debezium version : 1.7.0-Final

  • Confluent Kafka version : 6.2.1

  • Hudi version : 2.12-0.10.0

  • Spark version : 3.0.1

  • Hive version : 3.1.2

  • Hadoop version : Amazon 3.2.1

  • Storage (HDFS/S3/GCS..) : S3

  • Running on Docker? (yes/no) : No

Additional context

Please note that for newly added not null column (name : ten) default value is present in schema registry. Compare this with schema present in hudi job stacktrace attached below. Confluent schema registry schema after not null column is added to postgres table :

{\"type\":\"record\",\"name\":\"Value\",\"namespace\":\"platform_dev_db_service.public.test\",\"fields\":[{\"name\":\"id\",\"type\":{\"type\":\"int\",\"connect.default\":0},\"default\":0},{\"name\":\"descr\",\"type\":{\"type\":\"string\",\"connect.default\":\"NA\"},\"default\":\"NA\"},{\"name\":\"array_col\",\"type\":[\"null\",{\"type\":\"string\",\"connect.version\":1,\"connect.name\":\"io.debezium.data.Json\"}],\"default\":null},{\"name\":\"imp_column\",\"type\":{\"type\":\"string\",\"connect.default\":\"foo\"},\"default\":\"foo\"},{\"name\":\"new_col\",\"type\":[{\"type\":\"string\",\"connect.default\":\"new_col default value\"},\"null\"],\"default\":\"new_col default value\"},{\"name\":\"one\",\"type\":{\"type\":\"boolean\",\"connect.default\":false},\"default\":false},{\"name\":\"six\",\"type\":[{\"type\":\"string\",\"connect.default\":\"sfa\"},\"null\"],\"default\":\"sfa\"},{\"name\":\"eight\",\"type\":[{\"type\":\"string\",\"connect.default\":\"eight default\"},\"null\"],\"default\":\"eight default\"},{\"name\":\"nine\",\"type\":{\"type\":\"boolean\",\"connect.default\":true},\"default\":true},{\"name\":\"ten\",\"type\":{\"type\":\"string\",\"connect.default\":\"ten_10\"},\"default\":\"ten_10\"},{\"name\":\"eleven\",\"type\":{\"type\":\"boolean\",\"connect.default\":true},\"default\":true},{\"name\":\"__lsn\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"__op\",\"type\":[\"null\",\"string\"],\"default\":null},{\"name\":\"__source_ts_ms\",\"type\":[\"null\",\"long\"],\"default\":null},{\"name\":\"__deleted\",\"type\":[\"null\",\"string\"],\"default\":null}],\"connect.name\":\"platform_dev_db_service.public.test.Value\"}

Stacktrace

  "type" : "record",
  "name" : "hoodie_source",
  "namespace" : "hoodie.source",
  "fields" : [ {
    "name" : "_hoodie_commit_time",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_commit_seqno",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_record_key",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_partition_path",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_file_name",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "id",
    "type" : "int"
  }, {
    "name" : "descr",
    "type" : "string"
  }, {
    "name" : "array_col",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "imp_column",
    "type" : "string"
  }, {
    "name" : "new_col",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "one",
    "type" : "boolean"
  }, {
    "name" : "six",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "eight",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "nine",
    "type" : "boolean"
  }, {
    "name" : "ten",
    "type" : "string"
  }, {
    "name" : "__lsn",
    "type" : [ "null", "long" ],
    "default" : null
  }, {
    "name" : "__deleted",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "_hoodie_is_deleted",
    "type" : "boolean"
  } ]
}
	at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:102)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdateInternal(BaseSparkCommitActionExecutor.java:349)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpdate(BaseSparkCommitActionExecutor.java:340)
	at org.apache.hudi.table.action.commit.BaseSparkCommitActionExecutor.handleUpsertPartition(BaseSparkCommitActionExecutor.java:313)
	... 28 more
Caused by: org.apache.hudi.exception.HoodieException: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file for key 109 from old file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet to new file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet with writerSchema {
  "type" : "record",
  "name" : "hoodie_source",
  "namespace" : "hoodie.source",
  "fields" : [ {
    "name" : "_hoodie_commit_time",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_commit_seqno",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_record_key",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_partition_path",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_file_name",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "id",
    "type" : "int"
  }, {
    "name" : "descr",
    "type" : "string"
  }, {
    "name" : "array_col",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "imp_column",
    "type" : "string"
  }, {
    "name" : "new_col",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "one",
    "type" : "boolean"
  }, {
    "name" : "six",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "eight",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "nine",
    "type" : "boolean"
  }, {
    "name" : "ten",
    "type" : "string"
  }, {
    "name" : "__lsn",
    "type" : [ "null", "long" ],
    "default" : null
  }, {
    "name" : "__deleted",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "_hoodie_is_deleted",
    "type" : "boolean"
  } ]
}
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:147)
	at org.apache.hudi.table.action.commit.SparkMergeHelper.runMerge(SparkMergeHelper.java:100)
	... 31 more
Caused by: java.util.concurrent.ExecutionException: org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file for key 109 from old file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet to new file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet with writerSchema {
  "type" : "record",
  "name" : "hoodie_source",
  "namespace" : "hoodie.source",
  "fields" : [ {
    "name" : "_hoodie_commit_time",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_commit_seqno",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_record_key",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_partition_path",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_file_name",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "id",
    "type" : "int"
  }, {
    "name" : "descr",
    "type" : "string"
  }, {
    "name" : "array_col",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "imp_column",
    "type" : "string"
  }, {
    "name" : "new_col",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "one",
    "type" : "boolean"
  }, {
    "name" : "six",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "eight",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "nine",
    "type" : "boolean"
  }, {
    "name" : "ten",
    "type" : "string"
  }, {
    "name" : "__lsn",
    "type" : [ "null", "long" ],
    "default" : null
  }, {
    "name" : "__deleted",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "_hoodie_is_deleted",
    "type" : "boolean"
  } ]
}
	at java.util.concurrent.FutureTask.report(FutureTask.java:122)
	at java.util.concurrent.FutureTask.get(FutureTask.java:192)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.execute(BoundedInMemoryExecutor.java:141)
	... 32 more
Caused by: org.apache.hudi.exception.HoodieUpsertException: Failed to merge old record into new file for key 109 from old file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-30-2008_20220902102739507.parquet to new file s3://test-platform/raw-data/platform_dev_db_service/test/c6ba8786-dca0-46f7-8bba-b7204231e48a-0_0-31-2013_20220902104057412.parquet with writerSchema {
  "type" : "record",
  "name" : "hoodie_source",
  "namespace" : "hoodie.source",
  "fields" : [ {
    "name" : "_hoodie_commit_time",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_commit_seqno",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_record_key",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_partition_path",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "_hoodie_file_name",
    "type" : [ "null", "string" ],
    "doc" : "",
    "default" : null
  }, {
    "name" : "id",
    "type" : "int"
  }, {
    "name" : "descr",
    "type" : "string"
  }, {
    "name" : "array_col",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "imp_column",
    "type" : "string"
  }, {
    "name" : "new_col",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "one",
    "type" : "boolean"
  }, {
    "name" : "six",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "eight",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "nine",
    "type" : "boolean"
  }, {
    "name" : "ten",
    "type" : "string"
  }, {
    "name" : "__lsn",
    "type" : [ "null", "long" ],
    "default" : null
  }, {
    "name" : "__deleted",
    "type" : [ "null", "string" ],
    "default" : null
  }, {
    "name" : "_hoodie_is_deleted",
    "type" : "boolean"
  } ]
}
	at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:356)
	at org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:122)
	at org.apache.hudi.table.action.commit.AbstractMergeHelper$UpdateHandler.consumeOneRecord(AbstractMergeHelper.java:112)
	at org.apache.hudi.common.util.queue.BoundedInMemoryQueueConsumer.consume(BoundedInMemoryQueueConsumer.java:37)
	at org.apache.hudi.common.util.queue.BoundedInMemoryExecutor.lambda$null$2(BoundedInMemoryExecutor.java:121)
	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
	... 3 more
Caused by: java.lang.RuntimeException: Null-value for required field: ten
	at org.apache.parquet.avro.AvroWriteSupport.writeRecordFields(AvroWriteSupport.java:194)
	at org.apache.parquet.avro.AvroWriteSupport.write(AvroWriteSupport.java:165)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:128)
	at org.apache.parquet.hadoop.ParquetWriter.write(ParquetWriter.java:299)
	at org.apache.hudi.io.storage.HoodieParquetWriter.writeAvro(HoodieParquetWriter.java:95)
	at org.apache.hudi.io.HoodieMergeHandle.write(HoodieMergeHandle.java:351)
	... 8 more```

Shubham21k avatar Sep 06 '22 11:09 Shubham21k

I am yet to try this out locally and see how this pans out.bcoz, you have custom default value for strings. usually null defaults are taken into consideration. but non null defaults, I have not seen much.

but for now, can you try disabling this config and see what happens.

hoodie.deltastreamer.schemaprovider.spark_avro_post_processor.enable

nsivabalan avatar Sep 16 '22 03:09 nsivabalan

@Shubham21k noticed that you're on Spark version : 3.0.1. Recommend that you upgrade to more stable spark 3.1 or 3.2 and hudi 0.12.0. see this note here.

xushiyan avatar Sep 18 '22 04:09 xushiyan

@Shubham21k you're adding column ten of type boolean and yet the default value (ten_10) is string. Is this expected?

codope avatar Sep 20 '22 12:09 codope

Also, compared to similiar column six, looks like the schema registry does not add null default for ten. For six, this is what schema registry reports:

{\"name\":\"six\",\"type\":[{\"type\":\"string\",\"connect.default\":\"sfa\"},\"null\"],\"default\":\"sfa\"}

For ten, it looks like:

{\"name\":\"ten\",\"type\":{\"type\":\"string\",\"connect.default\":\"ten_10\"},\"default\":\"ten_10\"}

Note the difference in type union. In case of six, it allows null.

codope avatar Sep 20 '22 13:09 codope

Is it possible that the default values are being lost since this is a RowSource and the RowSource code sets a RowBasedSchemaProvider for the InputBatch?

https://github.com/apache/hudi/blob/master/hudi-utilities/src/main/java/org/apache/hudi/utilities/sources/RowSource.java#L45

the-other-tim-brown avatar Sep 20 '22 18:09 the-other-tim-brown

@the-other-tim-brown Thanks for digging into the code. I think you may have found the root cause. Did you try to reproduce this? Would be great if you can create a JIRA to track if its reproducible.

codope avatar Sep 21 '22 09:09 codope

Created HUDI-4942 to track this issue. @Shubham21k Can you try the patch https://github.com/apache/hudi/pull/6817

codope avatar Sep 28 '22 15:09 codope

Looks like the user-specified schema provider is already being honored. I also added a unit test with evolving schema in #6817. In this case, schema registry is the schema provider and we see that the type of column ten is not a union. Ideally, it should have been

{\"name\":\"ten\",\"type\":[{\"type\":\"string\",\"connect.default\":\"ten_10\"},\"null\"],\"default\":\"ten_10\"}

But it is

{\"name\":\"ten\",\"type\":{\"type\":\"string\",\"connect.default\":\"ten_10\"},\"default\":\"ten_10\"}

codope avatar Sep 30 '22 09:09 codope

@codope as you can see from the below image : ten is not nullable column thats why ten is not coming as an union. but six is nullable thats why it is coming as union in schema registry schema. image

Shubham21k avatar Oct 06 '22 07:10 Shubham21k

We encountered this issue as well, we think this is due to the SparkAvroPostProcessor ignoring the default values from the passed target schema here - when converting from struct type to avro type for primitive types this function sets no default (ignored from the source) here. And here this function does not set the defaults by type, rather gets the default for the field that was assumed to be set upstream.

We tried to ignore the schema post processor and we encountered more issues.

Also, In our case this only happened during updating an existing record with older schema version with new data with new schema version.

gudladona avatar Oct 13 '22 12:10 gudladona

@codope I tried the patch attached by you. but faced the same issue.

Shubham21k avatar Oct 19 '22 09:10 Shubham21k