datafusion-comet icon indicating copy to clipboard operation
datafusion-comet copied to clipboard

date_trunc incorrect results in non-UTC timezone

Open andygrove opened this issue 2 months ago • 8 comments

Describe the bug

PR https://github.com/apache/datafusion-comet/pull/2634 fixed some bugs with trunc/date_trunc, but I found another bug with the new tests added as part of this PR.

The tests pass when the Spark sessions timezone is UTC, but fail for other timezones.

When reading from DataFrame:

org.apache.comet.CometNativeException: Fail to process Arrow array with reason: Invalid argument error: RowConverter column schema mismatch, expected Timestamp(Microsecond, Some("America/Denver")) got Timestamp(Microsecond, Some("UTC")).

When reading from Parquet:


== Physical Plan ==
AdaptiveSparkPlan isFinalPlan=true
+- == Final Plan ==
   *(1) CometColumnarToRow
   +- CometSort [c0#8, date_trunc(quarter, c0)#95], [c0#8 ASC NULLS FIRST]
      +- AQEShuffleRead coalesced
         +- ShuffleQueryStage 0
            +- CometExchange rangepartitioning(c0#8 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1129]
               +- CometProject [c0#8, date_trunc(quarter, c0)#95], [c0#8, date_trunc(quarter, c0#8, Some(America/Denver)) AS date_trunc(quarter, c0)#95]
                  +- CometScan [native_iceberg_compat] parquet [c0#8] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/spark-ec4ccf01-3f14-44b0-8c83-fa87cad8d6df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c0:timestamp>
+- == Initial Plan ==
   CometSort [c0#8, date_trunc(quarter, c0)#95], [c0#8 ASC NULLS FIRST]
   +- CometExchange rangepartitioning(c0#8 ASC NULLS FIRST, 10), ENSURE_REQUIREMENTS, CometNativeShuffle, [plan_id=1018]
      +- CometProject [c0#8, date_trunc(quarter, c0)#95], [c0#8, date_trunc(quarter, c0#8, Some(America/Denver)) AS date_trunc(quarter, c0)#95]
         +- CometScan [native_iceberg_compat] parquet [c0#8] Batched: true, DataFilters: [], Format: CometParquet, Location: InMemoryFileIndex(1 paths)[file:/tmp/spark-ec4ccf01-3f14-44b0-8c83-fa87cad8d6df], PartitionFilters: [], PushedFilters: [], ReadSchema: struct<c0:timestamp>

== Results ==
!== Correct Answer - 1000 ==                              == Spark Answer - 1000 ==
 struct<c0:timestamp,date_trunc(quarter, c0):timestamp>   struct<c0:timestamp,date_trunc(quarter, c0):timestamp>
![3332-12-03 10:00:59.158,3332-09-30 23:00:00.0]          [3332-12-03 10:00:59.158,3332-10-01 00:00:00.0]
![3332-12-03 10:04:41.722,3332-09-30 23:00:00.0]          [3332-12-03 10:04:41.722,3332-10-01 00:00:00.0]
![3332-12-03 10:26:05.153,3332-09-30 23:00:00.0]          [3332-12-03 10:26:05.153,3332-10-01 00:00:00.0]

Steps to reproduce

No response

Expected behavior

No response

Additional context

No response

andygrove avatar Oct 25 '25 15:10 andygrove

Datafusion regression on date_trunc https://github.com/apache/datafusion/issues/18334

comphead avatar Oct 30 '25 17:10 comphead

take

hsiang-c avatar Oct 30 '25 17:10 hsiang-c

I have a smaller repro:

test("sort on timestamp after changing session timezone") {
    // create data in specific timezone
    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "Europe/London") {
      createTimestampTestData.createOrReplaceTempView("tbl")
    }
    // read data in a different timezone
    withSQLConf(SQLConf.SESSION_LOCAL_TIMEZONE.key -> "America/Denver") {
      checkSparkAnswerAndOperator("SELECT c0 FROM tbl ORDER BY c0")
    }
  }

andygrove avatar Nov 07 '25 18:11 andygrove

just FYI the workaround is

CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false"

This happens because in RangeRepartition on RowConverter, I'm digging into this real quick to see what is the correct behavior

comphead avatar Nov 08 '25 18:11 comphead

just FYI the workaround is

CometConf.COMET_EXEC_SHUFFLE_ENABLED.key -> "false"

This happens because in RangeRepartition on RowConverter, I'm digging into this real quick to see what is the correct behavior

Here's what I know so far:

In native execution we have always assumed UTC. When we read from Parquet, we convert to UTC.

When creating arrow schema from ~Parquet~ Spark schema we hard-coded UTC. I have a PR https://github.com/apache/datafusion-comet/pull/2734 to remove that hard coding and that is a partial fix.

When native comet scan reads from JVM LocalTableScan it is not always UTC, so we have a mismatch there.

andygrove avatar Nov 08 '25 19:11 andygrove

short term fix may be to add some casting to UTC

andygrove avatar Nov 08 '25 19:11 andygrove

I think I have convinced myself that this is only an issue when using spark.comet.sparkToColumnar.enabled or other uses of SparkToColumnar, so I don't think it is urgent to fix for 0.12.0, but we should at least document this somewhere.

andygrove avatar Nov 08 '25 20:11 andygrove

I think I have convinced myself that this is only an issue when using spark.comet.sparkToColumnar.enabled or other uses of SparkToColumnar, so I don't think it is urgent to fix for 0.12.0, but we should at least document this somewhere.

docs PR: https://github.com/apache/datafusion-comet/pull/2740

andygrove avatar Nov 08 '25 20:11 andygrove