hudi
hudi copied to clipboard
[SUPPORT] Exception when write null value to table with timestamp partitioning
Tips before filing an issue
-
Have you gone through our FAQs?
-
Join the mailing list to engage in conversations and get faster support at [email protected].
-
If you have triaged this as a bug, then file an issue directly.
Describe the problem you faced
Exception when write null value to table with timestamp partitioning To Reproduce
test("Test various data types as partition fields") {
withRecordType()(withTempDir { tmp =>
val tableName = generateTableName
spark.sql(
s"""
|CREATE TABLE $tableName (
| id INT,
| boolean_field BOOLEAN,
| float_field FLOAT,
| byte_field BYTE,
| short_field SHORT,
| decimal_field DECIMAL(10, 5),
| date_field DATE,
| string_field STRING,
| timestamp_field TIMESTAMP
|) USING hudi
| TBLPROPERTIES (primaryKey = 'id')
| PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field)
|LOCATION '${tmp.getCanonicalPath}'
""".stripMargin)
// Insert data into partitioned table
spark.sql(
s"""
|INSERT INTO $tableName VALUES
|(1, TRUE, CAST(1.0 as FLOAT), 1, 1, 1234.56789, DATE '2021-01-05', 'partition1', null),
|(2, FALSE,CAST(2.0 as FLOAT), 2, 2, 6789.12345, DATE '2021-01-06', 'partition2', TIMESTAMP '2021-01-06 11:00:00')
""".stripMargin)
checkAnswer(s"SELECT id, boolean_field FROM $tableName ORDER BY id")(
Seq(1, true),
Seq(2, false)
)
})
}
Expected behavior
No exception encoutered
Environment Description
- Hudi version :
0.15.x
- Spark version :
3.5
-
Hive version :
-
Hadoop version :
-
Storage (HDFS/S3/GCS..) :
OSS
- Running on Docker? (yes/no) :
No
Additional context
Add any other context about the problem here.
Stacktrace
Add the stacktrace of the error.
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 9.0 failed 1 times, most recent failure: Lost task 0.0 in stage 9.0 (TID 8) (30.221.116.216 executor driver): java.lang.NumberFormatException: For input string: "HIVE_DEFAULT_PARTITION" at java.lang.NumberFormatException.forInputString(NumberFormatException.java:65) at java.lang.Long.parseLong(Long.java:589) at java.lang.Long.parseLong(Long.java:631) at scala.collection.immutable.StringLike.toLong(StringLike.scala:315) at scala.collection.immutable.StringLike.toLong$(StringLike.scala:315) at scala.collection.immutable.StringOps.toLong(StringOps.scala:33) at org.apache.spark.sql.hudi.command.SqlKeyGenerator.$anonfun$convertPartitionPathToSqlType$1(SqlKeyGenerator.scala:170) at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286) at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36) at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33) at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198) at scala.collection.TraversableLike.map(TraversableLike.scala:286) at scala.collection.TraversableLike.map$(TraversableLike.scala:279) at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198) at org.apache.spark.sql.hudi.command.SqlKeyGenerator.convertPartitionPathToSqlType(SqlKeyGenerator.scala:156) at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getPartitionPath(SqlKeyGenerator.scala:106) at org.apache.hudi.HoodieCreateRecordUtils$.getHoodieKeyAndMaybeLocationFromAvroRecord(HoodieCreateRecordUtils.scala:212) at org.apache.hudi.HoodieCreateRecordUtils$.$anonfun$createHoodieRecordRdd$5(HoodieCreateRecordUtils.scala:133) at scala.collection.Iterator$$anon$10.next(Iterator.scala:461) at org.apache.spark.storage.memory.MemoryStore.putIterator(MemoryStore.scala:224) at org.apache.spark.storage.memory.MemoryStore.putIteratorAsBytes(MemoryStore.scala:352) at org.apache.spark.storage.BlockManager.$anonfun$doPutIterator$1(BlockManager.scala:1614) at org.apache.spark.storage.BlockManager.org$apache$spark$storage$BlockManager$$doPut(BlockManager.scala:1524) at org.apache.spark.storage.BlockManager.doPutIterator(BlockManager.scala:1588) at org.apache.spark.storage.BlockManager.getOrElseUpdate(BlockManager.scala:1389) at org.apache.spark.storage.BlockManager.getOrElseUpdateRDDBlock(BlockManager.scala:1343) at org.apache.spark.rdd.RDD.getOrCompute(RDD.scala:379) at org.apache.spark.rdd.RDD.iterator(RDD.scala:329) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52) at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:367) at org.apache.spark.rdd.RDD.iterator(RDD.scala:331) at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:104) at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:54) at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:166) at org.apache.spark.scheduler.Task.run(Task.scala:141) at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64) at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61) at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94) at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623) at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149) at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624) at java.lang.Thread.run(Thread.java:750)
Hi @stream2000
I am able to reproduce this issue while running the following spark code. Needs to test the Hudi test case.
spark-shell \
--conf 'spark.serializer=org.apache.spark.serializer.KryoSerializer' \
--conf 'spark.sql.catalog.spark_catalog=org.apache.spark.sql.hudi.catalog.HoodieCatalog' \
--conf 'spark.sql.extensions=org.apache.spark.sql.hudi.HoodieSparkSessionExtension' \
--conf 'spark.kryo.registrator=org.apache.spark.HoodieSparkKryoRegistrar' \
--conf "spark.sql.hive.convertMetastoreParquet=false"
spark.sql(
s"""
|CREATE TABLE hudi_test_null_partition (
| id INT,
| boolean_field BOOLEAN,
| float_field FLOAT,
| byte_field BYTE,
| short_field SHORT,
| decimal_field DECIMAL(10, 5),
| date_field DATE,
| string_field STRING,
| timestamp_field TIMESTAMP
|) USING hudi
| TBLPROPERTIES (primaryKey = 'id')
| PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field)
""".stripMargin)
spark.sql(
s"""
|INSERT INTO hudi_test_null_partition VALUES
|(1, TRUE, CAST(1.0 as FLOAT), 1, 1, 1234.56789, DATE '2021-01-05', 'partition1', null),
|(2, FALSE,CAST(2.0 as FLOAT), 2, 2, 6789.12345, DATE '2021-01-06', 'partition2', TIMESTAMP '2021-01-06 11:00:00')
""".stripMargin)
ERROR:
Caused by: java.lang.NumberFormatException: For input string: "__HIVE_DEFAULT_PARTITION__"
at java.base/java.lang.NumberFormatException.forInputString(NumberFormatException.java:65)
at java.base/java.lang.Long.parseLong(Long.java:692)
at java.base/java.lang.Long.parseLong(Long.java:817)
at scala.collection.immutable.StringLike.toLong(StringLike.scala:315)
at scala.collection.immutable.StringLike.toLong$(StringLike.scala:315)
at scala.collection.immutable.StringOps.toLong(StringOps.scala:33)
at org.apache.spark.sql.hudi.command.SqlKeyGenerator.$anonfun$convertPartitionPathToSqlType$1(SqlKeyGenerator.scala:170)
at scala.collection.TraversableLike.$anonfun$map$1(TraversableLike.scala:286)
at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
at scala.collection.mutable.ArrayOps$ofRef.foreach(ArrayOps.scala:198)
at scala.collection.TraversableLike.map(TraversableLike.scala:286)
at scala.collection.TraversableLike.map$(TraversableLike.scala:279)
at scala.collection.mutable.ArrayOps$ofRef.map(ArrayOps.scala:198)
at org.apache.spark.sql.hudi.command.SqlKeyGenerator.convertPartitionPathToSqlType(SqlKeyGenerator.scala:156)
at org.apache.spark.sql.hudi.command.SqlKeyGenerator.getPartitionPath(SqlKeyGenerator.scala:106)
at org.apache.hudi.HoodieCreateRecordUtils$.getHoodieKeyAndMaybeLocationFromAvroRecord(HoodieCreateRecordUtils.scala:209)
at org.apache.hudi.HoodieCreateRecordUtils$.$anonfun$createHoodieRecordRdd$5(HoodieCreateRecordUtils.scala:130)
Spark with Hive
spark.sql(
s"""
|CREATE TABLE hive_test_null_partition (
| id INT,
| boolean_field BOOLEAN,
| float_field FLOAT,
| byte_field BYTE,
| short_field SHORT,
| decimal_field DECIMAL(10, 5),
| date_field DATE,
| string_field STRING,
| timestamp_field TIMESTAMP
|)
| PARTITIONED BY (boolean_field, float_field, byte_field, short_field, decimal_field, date_field, string_field, timestamp_field)
""".stripMargin)
spark.sql("set hive.exec.dynamic.partition.mode=nonstrict")
spark.sql(
s"""
|INSERT INTO hive_test_null_partition VALUES
|(1, TRUE, CAST(1.0 as FLOAT), 1, 1, 1234.56789, DATE '2021-01-05', 'partition1', null),
|(2, FALSE,CAST(2.0 as FLOAT), 2, 2, 6789.12345, DATE '2021-01-06', 'partition2', TIMESTAMP '2021-01-06 11:00:00')
""".stripMargin)
# tree hive_test_null_partition
hive_test_null_partition
├── boolean_field=false
│ └── float_field=2.0
│ └── byte_field=2
│ └── short_field=2
│ └── decimal_field=6789.12345
│ └── date_field=2021-01-06
│ └── string_field=partition2
│ └── timestamp_field=2021-01-06 11%3A00%3A00
│ └── part-00001-106ff079-b48f-4428-9bc6-c55ea4b8caec.c000
└── boolean_field=true
└── float_field=1.0
└── byte_field=1
└── short_field=1
└── decimal_field=1234.56789
└── date_field=2021-01-05
└── string_field=partition1
└── timestamp_field=__HIVE_DEFAULT_PARTITION__
└── part-00000-106ff079-b48f-4428-9bc6-c55ea4b8caec.c000
Created upstream jira for this issue - https://issues.apache.org/jira/browse/HUDI-8315
@rangareddy I will like to contribute to this issue.
I did some initial analysis. I found that the issue is at SqlKeyGenerator#convertPartitionPathToSqlType
where the default HIVE_DEFAULT_PARTITION value is trying to be converted into Long/milli second.
i added following logic to fix this issue temporarily ,
but I couldn't run the testcases to validate my changes,
cmd to run testcase: mvn test -Dtest=TestSparkSqlWithTimestampKeyGenerator -Drat.skip=true
can you provide suggestion on running the testcase at local ?
FIxed via https://github.com/apache/hudi/pull/12621