hudi icon indicating copy to clipboard operation
hudi copied to clipboard

[SUPPORT] Exception when write null value to table with timestamp partitioning

Open stream2000 opened this issue 1 year ago • 3 comments

Tips before filing an issue

  • Have you gone through our FAQs?

  • Join the mailing list to engage in conversations and get faster support at [email protected].

  • If you have triaged this as a bug, then file an issue directly.

Describe the problem you faced

Exception when write null value to table with timestamp partitioning To Reproduce

  test("Test various data types as partition fields") {
    withRecordType()(withTempDir { tmp =>
      val tableName = generateTableName
      spark.sql(
        s"""
           |CREATE TABLE $tableName (
           |  id INT,
           |  boolean_field BOOLEAN,
           |  float_field FLOAT,
           |  byte_field BYTE,
           |  short_field SHORT,
           |  decimal_field DECIMAL(10, 5),
           |  date_field DATE,
           |  string_field STRING,
           |  timestamp_field TIMESTAMP
           |) USING hudi
           | TBLPROPERTIES (primaryKey = 'id')
           | PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field)
           |LOCATION '${tmp.getCanonicalPath}'
     """.stripMargin)

      // Insert data into partitioned table
      spark.sql(
        s"""
           |INSERT INTO $tableName VALUES
           |(1, TRUE, CAST(1.0 as FLOAT), 1, 1, 1234.56789, DATE '2021-01-05', 'partition1', null),
           |(2, FALSE,CAST(2.0 as FLOAT), 2, 2, 6789.12345, DATE '2021-01-06', 'partition2', TIMESTAMP '2021-01-06 11:00:00')
     """.stripMargin)

      checkAnswer(s"SELECT id, boolean_field FROM $tableName ORDER BY id")(
        Seq(1, true),
        Seq(2, false)
      )
    })
  }

Expected behavior

No exception encoutered

Environment Description

  • Hudi version :

0.15.x

  • Spark version :

3.5

  • Hive version :

  • Hadoop version :

  • Storage (HDFS/S3/GCS..) :

OSS

  • Running on Docker? (yes/no) :

No

Additional context

Add any other context about the problem here.

Stacktrace

Add the stacktrace of the error.

Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 8) (30.221.116.216 executor driver): java.lang.NumberFormatException: For input string: "HIVE_DEFAULT_PARTITION" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at scala.collection.immutable.StringLike.toLong(StringLike.scala:315) at scala.collection.immutable.StringLike.toLong$(StringLike.scala:315) at scala.collection.immutable.StringOps.toLong(StringOps.scala:33) at org.apache.spark.sql.hudi.command.SqlKeyGenerator.$anonfun$convertPartitionPathToSqlType$1(SqlKeyGenerator.scala:170) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.hudi.command.SqlKeyGenerator.convertPartitionPathToSqlType(SqlKeyGenerator.scala:156) at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getPartitionPath(SqlKeyGenerator.scala:106) at org.apache.hudi.HoodieCreateRecordUtils$.getHoodieKeyAndMaybeLocationFromAvroRecord(HoodieCreateRecordUtils.scala:212) at org.apache.hudi.HoodieCreateRecordUtils$.$anonfun$createHoodieRecordRdd$5(HoodieCreateRecordUtils.scala:133) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389) at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)

stream2000 avatar Sep 06 '24 09:09 stream2000

Hi @stream2000

I am able to reproduce this issue while running the following spark code. Needs to test the Hudi test case.

spark-shell \
	--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
	--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
	--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
	--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
	--conf "spark.sql.hive.convertMetastoreParquet=false"
spark.sql(
        s"""
           |CREATE TABLE hudi_test_null_partition (
           |  id INT,
           |  boolean_field BOOLEAN,
           |  float_field FLOAT,
           |  byte_field BYTE,
           |  short_field SHORT,
           |  decimal_field DECIMAL(10, 5),
           |  date_field DATE,
           |  string_field STRING,
           |  timestamp_field TIMESTAMP
           |) USING hudi
           | TBLPROPERTIES (primaryKey = 'id')
           | PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field)
     """.stripMargin)

spark.sql(
s"""
   |INSERT INTO hudi_test_null_partition VALUES
   |(1, TRUE, CAST(1.0 as FLOAT), 1, 1, 1234.56789, DATE '2021-01-05', 'partition1', null),
   |(2, FALSE,CAST(2.0 as FLOAT), 2, 2, 6789.12345, DATE '2021-01-06', 'partition2', TIMESTAMP '2021-01-06 11:00:00')
""".stripMargin)

ERROR:

Caused by: java.lang.NumberFormatException: For input string: "__HIVE_DEFAULT_PARTITION__"
  at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
  at java.base/java.lang.Long.parseLong(Long.java:692)
  at java.base/java.lang.Long.parseLong(Long.java:817)
  at scala.collection.immutable.StringLike.toLong(StringLike.scala:315)
  at scala.collection.immutable.StringLike.toLong$(StringLike.scala:315)
  at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
  at org.apache.spark.sql.hudi.command.SqlKeyGenerator.$anonfun$convertPartitionPathToSqlType$1(SqlKeyGenerator.scala:170)
  at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
  at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
  at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
  at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
  at scala.collection.TraversableLike.map(TraversableLike.scala:286)
  at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
  at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
  at org.apache.spark.sql.hudi.command.SqlKeyGenerator.convertPartitionPathToSqlType(SqlKeyGenerator.scala:156)
  at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getPartitionPath(SqlKeyGenerator.scala:106)
  at org.apache.hudi.HoodieCreateRecordUtils$.getHoodieKeyAndMaybeLocationFromAvroRecord(HoodieCreateRecordUtils.scala:209)
  at org.apache.hudi.HoodieCreateRecordUtils$.$anonfun$createHoodieRecordRdd$5(HoodieCreateRecordUtils.scala:130)

rangareddy avatar Oct 07 '24 15:10 rangareddy

Spark with Hive

spark.sql(
        s"""
           |CREATE TABLE hive_test_null_partition (
           |  id INT,
           |  boolean_field BOOLEAN,
           |  float_field FLOAT,
           |  byte_field BYTE,
           |  short_field SHORT,
           |  decimal_field DECIMAL(10, 5),
           |  date_field DATE,
           |  string_field STRING,
           |  timestamp_field TIMESTAMP
           |) 
           | PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field)
     """.stripMargin)

spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")

spark.sql(
s"""
   |INSERT INTO hive_test_null_partition VALUES
   |(1, TRUE, CAST(1.0 as FLOAT), 1, 1, 1234.56789, DATE '2021-01-05', 'partition1', null),
   |(2, FALSE,CAST(2.0 as FLOAT), 2, 2, 6789.12345, DATE '2021-01-06', 'partition2', TIMESTAMP '2021-01-06 11:00:00')
""".stripMargin)
# tree hive_test_null_partition
hive_test_null_partition
├── boolean_field=false
│   └── float_field=2.0
│       └── byte_field=2
│           └── short_field=2
│               └── decimal_field=6789.12345
│                   └── date_field=2021-01-06
│                       └── string_field=partition2
│                           └── timestamp_field=2021-01-06 11%3A00%3A00
│                               └── part-00001-106ff079-b48f-4428-9bc6-c55ea4b8caec.c000
└── boolean_field=true
    └── float_field=1.0
        └── byte_field=1
            └── short_field=1
                └── decimal_field=1234.56789
                    └── date_field=2021-01-05
                        └── string_field=partition1
                            └── timestamp_field=__HIVE_DEFAULT_PARTITION__
                                └── part-00000-106ff079-b48f-4428-9bc6-c55ea4b8caec.c000

rangareddy avatar Oct 07 '24 17:10 rangareddy

Created upstream jira for this issue - https://issues.apache.org/jira/browse/HUDI-8315

rangareddy avatar Oct 07 '24 17:10 rangareddy

@rangareddy I will like to contribute to this issue.

I did some initial analysis. I found that the issue is at SqlKeyGenerator#convertPartitionPathToSqlType

where the default HIVE_DEFAULT_PARTITION value is trying to be converted into Long/milli second.

i added following logic to fix this issue temporarily ,

image

but I couldn't run the testcases to validate my changes,

cmd to run testcase: mvn test -Dtest=TestSparkSqlWithTimestampKeyGenerator -Drat.skip=true

can you provide suggestion on running the testcase at local ?

karthick-de-25 avatar Jan 12 '25 13:01 karthick-de-25

FIxed via https://github.com/apache/hudi/pull/12621

ad1happy2go avatar Jan 28 '25 10:01 ad1happy2go