datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

Parquet column with integer logical type cannot read as Spark date column

Open okue opened this issue 1 year ago • 5 comments

Describe the bug

Date type is not decoded expectedly

For the table that has a Date type column (EventDate),

create external table hits (
    EventDate DATE NOT NULL, ...

EventDate column isn't decoded as Date type.

>>> df = spark.sql("SELECT EventDate FROM hits limit 5")
>>> df
DataFrame[EventDate: date]
>>> df.show()
+---------+
|EventDate|
+---------+
|    15901|
|    15901|
|    15901|
|    15901|
|    15901|
+---------+
# integer value, not date!

MIN/MAX aggregation fails with Date type

>>> spark.sql("SELECT MIN(EventDate) FROM hits;").show()
Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
Caused by: org.apache.comet.CometRuntimeException: Internal error: MIN/MAX is not expected to receive scalars of incompatible types (Date32("NULL"), Int32(15901)).
This was likely caused by a bug in DataFusion's code and we would welcome that you file an bug report in our issue tracker
	at org.apache.comet.Native.executePlan(Native Method)
	at org.apache.comet.CometExecIterator.executeNative(CometExecIterator.scala:77)
	at org.apache.comet.CometExecIterator.getNextBatch(CometExecIterator.scala:116)
	at org.apache.comet.CometExecIterator.hasNext(CometExecIterator.scala:166)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at org.apache.spark.shuffle.sort.BypassMergeSortShuffleWriter.write(BypassMergeSortShuffleWriter.java:140)
	at org.apache.spark.shuffle.ShuffleWriteProcessor.write(ShuffleWriteProcessor.scala:59)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:101)
	at org.apache.spark.scheduler.ShuffleMapTask.runTask(ShuffleMapTask.scala:53)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)

Steps to reproduce

  1. Download https://github.com/ClickHouse/ClickBench?tab=readme-ov-file#data-loading
  2. Create table with the DDL below.
  3. Run queries
create external table hits
(
    WatchID BIGINT NOT NULL,
    JavaEnable SMALLINT NOT NULL,
    Title STRING NOT NULL,
    GoodEvent SMALLINT NOT NULL,
    EventTime BIGINT NOT NULL,
    EventDate DATE NOT NULL,
    CounterID INT NOT NULL,
    ClientIP INT NOT NULL,
    RegionID INT NOT NULL,
    UserID BIGINT NOT NULL,
    CounterClass SMALLINT NOT NULL,
    OS SMALLINT NOT NULL,
    UserAgent SMALLINT NOT NULL,
    URL STRING NOT NULL,
    Referer STRING NOT NULL,
    IsRefresh SMALLINT NOT NULL,
    RefererCategoryID SMALLINT NOT NULL,
    RefererRegionID INT NOT NULL,
    URLCategoryID SMALLINT NOT NULL,
    URLRegionID INT NOT NULL,
    ResolutionWidth SMALLINT NOT NULL,
    ResolutionHeight SMALLINT NOT NULL,
    ResolutionDepth SMALLINT NOT NULL,
    FlashMajor SMALLINT NOT NULL,
    FlashMinor SMALLINT NOT NULL,
    FlashMinor2 STRING NOT NULL,
    NetMajor SMALLINT NOT NULL,
    NetMinor SMALLINT NOT NULL,
    UserAgentMajor SMALLINT NOT NULL,
    UserAgentMinor STRING NOT NULL,
    CookieEnable SMALLINT NOT NULL,
    JavascriptEnable SMALLINT NOT NULL,
    IsMobile SMALLINT NOT NULL,
    MobilePhone SMALLINT NOT NULL,
    MobilePhoneModel STRING NOT NULL,
    Params STRING NOT NULL,
    IPNetworkID INT NOT NULL,
    TraficSourceID SMALLINT NOT NULL,
    SearchEngineID SMALLINT NOT NULL,
    SearchPhrase STRING NOT NULL,
    AdvEngineID SMALLINT NOT NULL,
    IsArtifical SMALLINT NOT NULL,
    WindowClientWidth SMALLINT NOT NULL,
    WindowClientHeight SMALLINT NOT NULL,
    ClientTimeZone SMALLINT NOT NULL,
    ClientEventTime bigint NOT NULL,
    SilverlightVersion1 SMALLINT NOT NULL,
    SilverlightVersion2 SMALLINT NOT NULL,
    SilverlightVersion3 INT NOT NULL,
    SilverlightVersion4 SMALLINT NOT NULL,
    PageCharset STRING NOT NULL,
    CodeVersion INT NOT NULL,
    IsLink SMALLINT NOT NULL,
    IsDownload SMALLINT NOT NULL,
    IsNotBounce SMALLINT NOT NULL,
    FUniqID BIGINT NOT NULL,
    OriginalURL STRING NOT NULL,
    HID INT NOT NULL,
    IsOldCounter SMALLINT NOT NULL,
    IsEvent SMALLINT NOT NULL,
    IsParameter SMALLINT NOT NULL,
    DontCountHits SMALLINT NOT NULL,
    WithHash SMALLINT NOT NULL,
    HitColor STRING NOT NULL,
    LocalEventTime bigint NOT NULL,
    Age SMALLINT NOT NULL,
    Sex SMALLINT NOT NULL,
    Income SMALLINT NOT NULL,
    Interests SMALLINT NOT NULL,
    Robotness SMALLINT NOT NULL,
    RemoteIP INT NOT NULL,
    WindowName INT NOT NULL,
    OpenerName INT NOT NULL,
    HistoryLength SMALLINT NOT NULL,
    BrowserLanguage STRING NOT NULL,
    BrowserCountry STRING NOT NULL,
    SocialNetwork STRING NOT NULL,
    SocialAction STRING NOT NULL,
    HTTPError SMALLINT NOT NULL,
    SendTiming INT NOT NULL,
    DNSTiming INT NOT NULL,
    ConnectTiming INT NOT NULL,
    ResponseStartTiming INT NOT NULL,
    ResponseEndTiming INT NOT NULL,
    FetchTiming INT NOT NULL,
    SocialSourceNetworkID SMALLINT NOT NULL,
    SocialSourcePage STRING NOT NULL,
    ParamPrice BIGINT NOT NULL,
    ParamOrderID STRING NOT NULL,
    ParamCurrency STRING NOT NULL,
    ParamCurrencyID SMALLINT NOT NULL,
    OpenstatServiceName STRING NOT NULL,
    OpenstatCampaignID STRING NOT NULL,
    OpenstatAdID STRING NOT NULL,
    OpenstatSourceID STRING NOT NULL,
    UTMSource STRING NOT NULL,
    UTMMedium STRING NOT NULL,
    UTMCampaign STRING NOT NULL,
    UTMContent STRING NOT NULL,
    UTMTerm STRING NOT NULL,
    FromTag STRING NOT NULL,
    HasGCLID SMALLINT NOT NULL,
    RefererHash BIGINT NOT NULL,
    URLHash BIGINT NOT NULL,
    CLID INT NOT NULL
)
using parquet
location './hits/';

Expected behavior

If DDL specifies Date type, the column should be decoded as Date type.

Additional context

$ duckdb -c "from parquet_schema('hits.parquet')"
┌──────────────┬───────────────────────┬────────────┬─────────────┬─────────────────┬──────────────┬────────────────┬───────┬───────────┬──────────┬────────────────────────────────┐
│  file_name   │         name          │    type    │ type_length │ repetition_type │ num_children │ converted_type │ scale │ precision │ field_id │          logical_type          │
│   varchar    │        varchar        │  varchar   │   varchar   │     varchar     │    int64     │    varchar     │ int64 │   int64   │  int64   │            varchar             │
├──────────────┼───────────────────────┼────────────┼─────────────┼─────────────────┼──────────────┼────────────────┼───────┼───────────┼──────────┼────────────────────────────────┤
...
│ hits.parquet │ EventDate             │ INT32      │             │ REQUIRED        │              │ UINT_16        │       │           │          │ IntType(bitWidth=, isSigned=0) │
...

okue avatar Feb 19 '24 03:02 okue

Thanks @okue for reporting this

comphead avatar Feb 19 '24 18:02 comphead

Thanks. I was able to reproduce this. Will take a look.

sunchao avatar Feb 20 '24 17:02 sunchao

CollectLimit 5
+- *(1) ColumnarToRow
   +- CometScan parquet spark_catalog.default.hits[EventDate#5] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:...], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<EventDate:date>

The query plan looks like above. The read schema is correct. Seems something wrong in scan operator.

viirya avatar Feb 21 '24 04:02 viirya

Oh, I know what is the root cause of it now.

If you collect the data and dump it out, you will see the result is correct. But Dataset.show() prints out incorrect result.

What Dataset.show() actually does, is to project each column to string type and collect the result then format it for printing.

So the incorrect result is caused by cast(EventDate#5 as string).

I check the input array to Cast expression in Comet, and it is an Int32 array. For Cast, it does Int32 to String conversion. That's why we see integers there instead of date string.

I look into the column EventDate. Its logical type in Parquet is actually Integer { bit_width: 16, is_signed: false }, not Date. I think in Spark as read schema is specified (from the table definition), it can overwrite the logical type from Parquet column. In Comet, I don't see we do this.

viirya avatar Feb 21 '24 06:02 viirya

Easy test to reproduce

Seq(15901).toDF("dt").write.parquet("/tmp/dt")
spark.read.schema("dt date").parquet("/tmp/dt").createOrReplaceTempView("v")
spark.sql("select min(dt) from v").show(false)

comphead avatar Mar 06 '24 00:03 comphead

This issues appears to be resolved, so I will close this issue. Thanks @okue for reporting it.

scala> Seq(15901).toDF("dt").write.parquet("/tmp/dt")
24/09/19 10:45:43 INFO core/src/lib.rs: Comet native library version 0.3.0 initialized
24/09/19 10:45:43 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 Execute InsertIntoHadoopFsRelationCommand [COMET: Execute InsertIntoHadoopFsRelationCommand is not supported]
+-  WriteFiles [COMET: WriteFiles is not supported]
   +-  LocalTableScan [COMET: LocalTableScan is not supported]

                                                                                
scala> spark.read.schema("dt date").parquet("/tmp/dt").createOrReplaceTempView("v")
24/09/19 10:45:50 WARN CometSparkSessionExtensions$CometExecRule: Comet cannot execute some parts of this plan natively (set spark.comet.explainFallback.enabled=false to disable this logging):
 Execute CreateViewCommand [COMET: Execute CreateViewCommand is not supported]
   +- CreateViewCommand
         +- LogicalRelation

scala> val df = spark.sql("select min(dt) from v")
df: org.apache.spark.sql.DataFrame = [min(dt): date]

scala> df.show(false)
+----------+
|min(dt)   |
+----------+
|2013-07-15|
+----------+


scala> df.explain()
== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=false
+- !CometHashAggregate [min#26], Final, [min(dt#7)]
   +- CometExchange SinglePartition, ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=176]
      +- !CometHashAggregate [dt#7], Partial, [partial_min(dt#7)]
         +- CometScan parquet [dt#7] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/dt], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<dt:date>

andygrove avatar Sep 19 '24 16:09 andygrove