Parquet column with integer logical type cannot read as Spark date column
Describe the bug
Date type is not decoded expectedly
For the table that has a Date type column (EventDate),
create external table hits (
EventDate DATE NOT NULL, ...
EventDate column isn't decoded as Date type.
>>> df = spark.sql("SELECT EventDate FROM hits limit 5")
>>> df
DataFrame[EventDate: date]
>>> df.show()
+---------+
|EventDate|
+---------+
| 15901|
| 15901|
| 15901|
| 15901|
| 15901|
+---------+
# integer value, not date!
MIN/MAX aggregation fails with Date type
>>> spark.sql("SELECT MIN(EventDate) FROM hits;").show()
Driver stacktrace:
at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
at scala.Option.foreach(Option.scala:407)
at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.comet.CometRuntimeException: Internal error: MIN/MAX is not expected to receive scalars of incompatible types (Date32("NULL"), Int32(15901)).
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
at org.apache.comet.Native.executePlan(Native Method)
at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:77)
at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:116)
at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:166)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
at org.apache.spark.scheduler.Task.run(Task.scala:139)
at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
at java.base/java.lang.Thread.run(Thread.java:833)
Steps to reproduce
- Download https://github.com/ClickHouse/ClickBench?tab=readme-ov-file#data-loading
- Create table with the DDL below.
- Run queries
create external table hits
(
WatchID BIGINT NOT NULL,
JavaEnable SMALLINT NOT NULL,
Title STRING NOT NULL,
GoodEvent SMALLINT NOT NULL,
EventTime BIGINT NOT NULL,
EventDate DATE NOT NULL,
CounterID INT NOT NULL,
ClientIP INT NOT NULL,
RegionID INT NOT NULL,
UserID BIGINT NOT NULL,
CounterClass SMALLINT NOT NULL,
OS SMALLINT NOT NULL,
UserAgent SMALLINT NOT NULL,
URL STRING NOT NULL,
Referer STRING NOT NULL,
IsRefresh SMALLINT NOT NULL,
RefererCategoryID SMALLINT NOT NULL,
RefererRegionID INT NOT NULL,
URLCategoryID SMALLINT NOT NULL,
URLRegionID INT NOT NULL,
ResolutionWidth SMALLINT NOT NULL,
ResolutionHeight SMALLINT NOT NULL,
ResolutionDepth SMALLINT NOT NULL,
FlashMajor SMALLINT NOT NULL,
FlashMinor SMALLINT NOT NULL,
FlashMinor2 STRING NOT NULL,
NetMajor SMALLINT NOT NULL,
NetMinor SMALLINT NOT NULL,
UserAgentMajor SMALLINT NOT NULL,
UserAgentMinor STRING NOT NULL,
CookieEnable SMALLINT NOT NULL,
JavascriptEnable SMALLINT NOT NULL,
IsMobile SMALLINT NOT NULL,
MobilePhone SMALLINT NOT NULL,
MobilePhoneModel STRING NOT NULL,
Params STRING NOT NULL,
IPNetworkID INT NOT NULL,
TraficSourceID SMALLINT NOT NULL,
SearchEngineID SMALLINT NOT NULL,
SearchPhrase STRING NOT NULL,
AdvEngineID SMALLINT NOT NULL,
IsArtifical SMALLINT NOT NULL,
WindowClientWidth SMALLINT NOT NULL,
WindowClientHeight SMALLINT NOT NULL,
ClientTimeZone SMALLINT NOT NULL,
ClientEventTime bigint NOT NULL,
SilverlightVersion1 SMALLINT NOT NULL,
SilverlightVersion2 SMALLINT NOT NULL,
SilverlightVersion3 INT NOT NULL,
SilverlightVersion4 SMALLINT NOT NULL,
PageCharset STRING NOT NULL,
CodeVersion INT NOT NULL,
IsLink SMALLINT NOT NULL,
IsDownload SMALLINT NOT NULL,
IsNotBounce SMALLINT NOT NULL,
FUniqID BIGINT NOT NULL,
OriginalURL STRING NOT NULL,
HID INT NOT NULL,
IsOldCounter SMALLINT NOT NULL,
IsEvent SMALLINT NOT NULL,
IsParameter SMALLINT NOT NULL,
DontCountHits SMALLINT NOT NULL,
WithHash SMALLINT NOT NULL,
HitColor STRING NOT NULL,
LocalEventTime bigint NOT NULL,
Age SMALLINT NOT NULL,
Sex SMALLINT NOT NULL,
Income SMALLINT NOT NULL,
Interests SMALLINT NOT NULL,
Robotness SMALLINT NOT NULL,
RemoteIP INT NOT NULL,
WindowName INT NOT NULL,
OpenerName INT NOT NULL,
HistoryLength SMALLINT NOT NULL,
BrowserLanguage STRING NOT NULL,
BrowserCountry STRING NOT NULL,
SocialNetwork STRING NOT NULL,
SocialAction STRING NOT NULL,
HTTPError SMALLINT NOT NULL,
SendTiming INT NOT NULL,
DNSTiming INT NOT NULL,
ConnectTiming INT NOT NULL,
ResponseStartTiming INT NOT NULL,
ResponseEndTiming INT NOT NULL,
FetchTiming INT NOT NULL,
SocialSourceNetworkID SMALLINT NOT NULL,
SocialSourcePage STRING NOT NULL,
ParamPrice BIGINT NOT NULL,
ParamOrderID STRING NOT NULL,
ParamCurrency STRING NOT NULL,
ParamCurrencyID SMALLINT NOT NULL,
OpenstatServiceName STRING NOT NULL,
OpenstatCampaignID STRING NOT NULL,
OpenstatAdID STRING NOT NULL,
OpenstatSourceID STRING NOT NULL,
UTMSource STRING NOT NULL,
UTMMedium STRING NOT NULL,
UTMCampaign STRING NOT NULL,
UTMContent STRING NOT NULL,
UTMTerm STRING NOT NULL,
FromTag STRING NOT NULL,
HasGCLID SMALLINT NOT NULL,
RefererHash BIGINT NOT NULL,
URLHash BIGINT NOT NULL,
CLID INT NOT NULL
)
using parquet
location './hits/';
Expected behavior
If DDL specifies Date type, the column should be decoded as Date type.
Additional context
$ duckdb -c "from parquet_schema('hits.parquet')"
┌──────────────┬───────────────────────┬────────────┬─────────────┬─────────────────┬──────────────┬────────────────┬───────┬───────────┬──────────┬────────────────────────────────┐
│ file_name │ name │ type │ type_length │ repetition_type │ num_children │ converted_type │ scale │ precision │ field_id │ logical_type │
│ varchar │ varchar │ varchar │ varchar │ varchar │ int64 │ varchar │ int64 │ int64 │ int64 │ varchar │
├──────────────┼───────────────────────┼────────────┼─────────────┼─────────────────┼──────────────┼────────────────┼───────┼───────────┼──────────┼────────────────────────────────┤
...
│ hits.parquet │ EventDate │ INT32 │ │ REQUIRED │ │ UINT_16 │ │ │ │ IntType(bitWidth=, isSigned=0) │
...
Thanks @okue for reporting this
Thanks. I was able to reproduce this. Will take a look.
CollectLimit 5
+- *(1) ColumnarToRow
+- CometScan parquet spark_catalog.default.hits[EventDate#5] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<EventDate:date>
The query plan looks like above. The read schema is correct. Seems something wrong in scan operator.
Oh, I know what is the root cause of it now.
If you collect the data and dump it out, you will see the result is correct. But Dataset.show() prints out incorrect result.
What Dataset.show() actually does, is to project each column to string type and collect the result then format it for printing.
So the incorrect result is caused by cast(EventDate#5 as string).
I check the input array to Cast expression in Comet, and it is an Int32 array. For Cast, it does Int32 to String conversion. That's why we see integers there instead of date string.
I look into the column EventDate. Its logical type in Parquet is actually Integer { bit_width: 16, is_signed: false }, not Date. I think in Spark as read schema is specified (from the table definition), it can overwrite the logical type from Parquet column. In Comet, I don't see we do this.
Easy test to reproduce
Seq(15901).toDF("dt").write.parquet("/tmp/dt")
spark.read.schema("dt date").parquet("/tmp/dt").createOrReplaceTempView("v")
spark.sql("select min(dt) from v").show(false)
This issues appears to be resolved, so I will close this issue. Thanks @okue for reporting it.
scala> Seq(15901).toDF("dt").write.parquet("/tmp/dt")
24/09/19 10:45:43 INFO core/src/lib.rs: Comet native library version 0.3.0 initialized
24/09/19 10:45:43 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
Execute InsertIntoHadoopFsRelationCommand [COMET: Execute InsertIntoHadoopFsRelationCommand is not supported]
+- WriteFiles [COMET: WriteFiles is not supported]
+- LocalTableScan [COMET: LocalTableScan is not supported]
scala> spark.read.schema("dt date").parquet("/tmp/dt").createOrReplaceTempView("v")
24/09/19 10:45:50 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
Execute CreateViewCommand [COMET: Execute CreateViewCommand is not supported]
+- CreateViewCommand
+- LogicalRelation
scala> val df = spark.sql("select min(dt) from v")
df: org.apache.spark.sql.DataFrame = [min(dt): date]
scala> df.show(false)
+----------+
|min(dt) |
+----------+
|2013-07-15|
+----------+
scala> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- !CometHashAggregate [min#26], Final, [min(dt#7)]
+- CometExchange SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=176]
+- !CometHashAggregate [dt#7], Partial, [partial_min(dt#7)]
+- CometScan parquet [dt#7] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/dt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<dt:date>