paimon icon indicating copy to clipboard operation
paimon copied to clipboard

[Bug] paimon 0.8 spark read tag error

Open eric666666 opened this issue 1 year ago • 3 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Paimon version

paimon 0.8 snapshot

Compute Engine

spark 3.3

Minimal reproduce step

select * from paimon.paimon.`ods_trade_orders_wm1$tags`
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2668)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2604)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2603)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2603)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1178)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1178)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2856)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2798)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2787)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:952)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2238)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2259)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2278)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2303)
	at org.apache.spark.rdd.RDD.$anonfun$collect$1(RDD.scala:1021)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:406)
	at org.apache.spark.rdd.RDD.collect(RDD.scala:1020)
	at org.apache.spark.sql.execution.SparkPlan.executeCollect(SparkPlan.scala:424)
	at org.apache.spark.sql.execution.SparkPlan.executeCollectPublic(SparkPlan.scala:451)
	at org.apache.spark.sql.execution.HiveResult$.hiveResultString(HiveResult.scala:76)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.$anonfun$run$2(SparkSQLDriver.scala:69)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:109)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:169)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:95)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:779)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:64)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLDriver.run(SparkSQLDriver.scala:69)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processCmd(SparkSQLCLIDriver.scala:384)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1(SparkSQLCLIDriver.scala:504)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.$anonfun$processLine$1$adapted(SparkSQLCLIDriver.scala:498)
	at scala.collection.Iterator.foreach(Iterator.scala:941)
	at scala.collection.Iterator.foreach$(Iterator.scala:941)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1429)
	at scala.collection.IterableLike.foreach(IterableLike.scala:74)
	at scala.collection.IterableLike.foreach$(IterableLike.scala:73)
	at scala.collection.AbstractIterable.foreach(Iterable.scala:56)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.processLine(SparkSQLCLIDriver.scala:498)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver$.main(SparkSQLCLIDriver.scala:286)
	at org.apache.spark.sql.hive.thriftserver.SparkSQLCLIDriver.main(SparkSQLCLIDriver.scala)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.apache.spark.deploy.JavaMainApplication.start(SparkApplication.scala:52)
	at org.apache.spark.deploy.SparkSubmit.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:984)
	at org.apache.spark.deploy.SparkSubmit.doRunMain$1(SparkSubmit.scala:191)
	at org.apache.spark.deploy.SparkSubmit.submit(SparkSubmit.scala:214)
	at org.apache.spark.deploy.SparkSubmit.doSubmit(SparkSubmit.scala:90)
	at org.apache.spark.deploy.SparkSubmit$$anon$2.doSubmit(SparkSubmit.scala:1072)
	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:1081)
	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
Caused by: java.lang.ArithmeticException: long overflow
	at java.lang.Math.multiplyExact(Math.java:892)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:240)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:190)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(DateTimeUtils.scala)
	at org.apache.paimon.spark.SparkInternalRow.fromPaimon(SparkInternalRow.java:282)
	at org.apache.paimon.spark.SparkInternalRow.getTimestampMicros(SparkInternalRow.java:133)
	at org.apache.paimon.spark.SparkInternalRow.getLong(SparkInternalRow.java:128)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:364)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:365)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:329)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:136)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:548)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1504)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:551)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

It only fails on 0.8 snapshot. But 0.7-inclubating run success. image

What doesn't meet your expectations?

I have debug it. image It seems create_time value get wrong. please fix it.

Anything else?

No response

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

eric666666 avatar Apr 28 '24 09:04 eric666666

I haven't reproduced it. Can you provide more detailed steps to reproduce it, is the tag written in spark?

Zouxxyy avatar Apr 29 '24 14:04 Zouxxyy

I haven't reproduced it. Can you provide more detailed steps to reproduce it, is the tag written in spark?

I find the reson. The tag is created by flink at paimon 0.8 snapshot version which compliled early than 2024-04-15, But after 2024-04-15 Tag class add two new class field. My tag file content:

{
  "version" : 3,
  "id" : 1,
  "schemaId" : 0,
  "baseManifestList" : "manifest-list-0dfb8ec3-0445-45f9-9c59-4d08e6443c43-0",
  "deltaManifestList" : "manifest-list-0dfb8ec3-0445-45f9-9c59-4d08e6443c43-1",
  "changelogManifestList" : "manifest-list-0dfb8ec3-0445-45f9-9c59-4d08e6443c43-2",
  "commitUser" : "59c08f33-b5de-4478-823c-5e681ee6e0b3",
  "commitIdentifier" : 1,
  "commitKind" : "APPEND",
  "timeMillis" : 1714379998639,
  "logOffsets" : { },
  "totalRecordCount" : 2531,
  "deltaRecordCount" : 2531,
  "changelogRecordCount" : 8227,
  "watermark" : 1714407456182
}

However new commit add new class fields: image Spark read tag system table‘s jar is compiled after 2024-04-15. So new version code read tag's tagCreateTime using LocalDateTime.MIN. image This directly made next exception

Caused by: java.lang.ArithmeticException: long overflow
	at java.lang.Math.multiplyExact(Math.java:892)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.millisToMicros(DateTimeUtils.scala:240)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils$.fromJavaTimestamp(DateTimeUtils.scala:190)
	at org.apache.spark.sql.catalyst.util.DateTimeUtils.fromJavaTimestamp(DateTimeUtils.scala)

Because LocalDateTime.MIN timestamp millis convert to micros is bigger than Long.MAX_VALUE. image I think there could use a big Date to instead LocalDateTime.MIN which is just a symbol to express old Tag file do not having tagCreateTime field. Such as we could use Epoch time. So i will submit a PR to fix it.

eric666666 avatar Apr 30 '24 06:04 eric666666

Thanks @eric666666 for reporting and investigation. We should return null for these two fields when they are nulls.

JingsongLi avatar Apr 30 '24 09:04 JingsongLi