delta icon indicating copy to clipboard operation
delta copied to clipboard

[BUG] Querying on timestamp for table with generated date partition column returns incorrect results

Open Jefffrey opened this issue 1 year ago • 0 comments

Bug

Which Delta project/connector is this regarding?

  • [x] Spark
  • [ ] Standalone
  • [ ] Flink
  • [ ] Kernel
  • [ ] Other (fill in here)

Describe the problem

Given a delta table like so:

CREATE TABLE kumachan (
  DTE TIMESTAMP,
  P_DTE DATE GENERATED ALWAYS AS (CAST(DTE AS DATE))
)
USING DELTA
PARTITIONED BY (P_DTE);

When we insert data using session timezone UTC, but then attempt to read data back using session timezone Australia/Melbourne, we get unexpected results.

Steps to reproduce

Running in spark-shell, setup:

// Ensure we write data to table as UTC
spark.conf.set("spark.sql.session.timeZone", "UTC")

import io.delta.tables._
import io.delta.implicits._
import org.apache.spark.sql.types._
import org.apache.spark.sql._
import org.apache.spark.sql.functions._

DeltaTable.createOrReplace(spark)
  .addColumn("dte", "TIMESTAMP")
  .addColumn(
    DeltaTable.columnBuilder("p_dte")
     .dataType(DateType)
     .generatedAlwaysAs("CAST(dte AS DATE)")
     .build())
  .partitionedBy("p_dte")
  .location("/tmp/delta/kumachan")
  .execute()

// Inserting based on Australia/Melbourne timestamp literals
val data = Seq(
  Row("2023-01-10 00:00:00 Australia/Melbourne"),
  Row("2023-01-11 00:00:00 Australia/Melbourne"),
  Row("2023-01-12 00:00:00 Australia/Melbourne")
)

spark.createDataFrame(
  spark.sparkContext.parallelize(data),
  StructType(List(StructField("dte", StringType, true)))
)
  .withColumn("dte", to_timestamp(col("dte")))
  .write
  .mode("overwrite")
  .delta("/tmp/delta/kumachan")

Observed results

Able to query based on specific timestamp with session timezone of UTC:

spark.conf.set("spark.sql.session.timeZone", "UTC")
spark.sql("""
select *, current_timezone() from
delta.`/tmp/delta/kumachan`
where dte = timestamp'2023-01-11 00:00:00 Australia/Melbourne'
""").show(false)

Returns expected:

+-------------------+----------+------------------+
|dte                |p_dte     |current_timezone()|
+-------------------+----------+------------------+
|2023-01-10 13:00:00|2023-01-10|UTC               |
+-------------------+----------+------------------+

However, when querying with session timezone as Australia/Melbourne:

spark.conf.set("spark.sql.session.timeZone", "Australia/Melbourne")
spark.sql("""
select *, current_timezone() from
delta.`/tmp/delta/kumachan`
where dte = timestamp'2023-01-11 00:00:00 Australia/Melbourne'
""").show(false)

No results are returned

+---+-----+------------------+
|dte|p_dte|current_timezone()|
+---+-----+------------------+
+---+-----+------------------+

Expected results

Expect to get 1 row, similar to when querying with UTC session timezone.

Further details

Due to partition filters being inserted. If we first see how the columns look to each other with Australia/Melbourne timezone:

spark.conf.set("spark.sql.session.timeZone", "Australia/Melbourne")
spark.sql("""
select *, current_timezone() from
delta.`/tmp/delta/kumachan`
order by 1
""").show(false)

Can see:

+-------------------+----------+-------------------+
|dte                |p_dte     |current_timezone() |
+-------------------+----------+-------------------+
|2023-01-10 00:00:00|2023-01-09|Australia/Melbourne|
|2023-01-11 00:00:00|2023-01-10|Australia/Melbourne|
|2023-01-12 00:00:00|2023-01-11|Australia/Melbourne|
+-------------------+----------+-------------------+

Notice how looking at only the days, they are offset by 1.

Checking the explain plan for the problem query:

spark.conf.set("spark.sql.session.timeZone", "Australia/Melbourne")
spark.sql("""
select *, current_timezone() from
delta.`/tmp/delta/kumachan`
where dte = timestamp'2023-01-11 00:00:00 Australia/Melbourne'
""").explain(true)

Output:

== Parsed Logical Plan ==
'Project [*, unresolvedalias('current_timezone(), None)]
+- 'Filter ('dte = 2023-01-11 00:00:00)
   +- 'UnresolvedRelation [delta, /tmp/delta/kumachan], [], false

== Analyzed Logical Plan ==
dte: timestamp, p_dte: date, current_timezone(): string
Project [dte#1640, p_dte#1641, current_timezone() AS current_timezone()#1642]
+- Filter (dte#1640 = 2023-01-11 00:00:00)
   +- SubqueryAlias spark_catalog.delta.`/tmp/delta/kumachan`
      +- Relation [dte#1640,p_dte#1641] parquet

== Optimized Logical Plan ==
Project [dte#1640, p_dte#1641, Australia/Melbourne AS current_timezone()#1642]
+- Filter (((p_dte#1641 = cast(2023-01-11 00:00:00 as date)) OR isnull((p_dte#1641 = cast(2023-01-11 00:00:00 as date)))) AND (isnotnull(dte#1640) AND (dte#1640 = 2023-01-11 00:00:00)))
   +- Relation [dte#1640,p_dte#1641] parquet

== Physical Plan ==
*(1) Project [dte#1640, p_dte#1641, Australia/Melbourne AS current_timezone()#1642]
+- *(1) Filter (isnotnull(dte#1640) AND (dte#1640 = 2023-01-11 00:00:00))
   +- *(1) ColumnarToRow
      +- FileScan parquet [dte#1640,p_dte#1641] Batched: true, DataFilters: [isnotnull(dte#1640), (dte#1640 = 2023-01-11 00:00:00)], Format: Parquet, Location: PreparedDeltaFileIndex(1 paths)[file:/tmp/delta/kumachan], PartitionFilters: [((p_dte#1641 = cast(2023-01-11 00:00:00 as date)) OR isnull((p_dte#1641 = cast(2023-01-11 00:00:..., PushedFilters: [IsNotNull(dte), EqualTo(dte,2023-01-11 00:00:00.0)], ReadSchema: struct<dte:timestamp>

In the optimized plan, it introduces filter on p_dte = cast(2023-01-11 00:00:00 as date) which would evaluate to p_dte = 2023-01-11. Can see this would return the row for:

+-------------------+----------+-------------------+
|dte                |p_dte     |current_timezone() |
+-------------------+----------+-------------------+
|2023-01-12 00:00:00|2023-01-11|Australia/Melbourne|
+-------------------+----------+-------------------+

However the subsequent filter dte = 2023-01-11 00:00:00 would then fail.

Environment information

  • Delta Lake version: 3.0.0
  • Spark version: 3.5.0
  • Scala version: 2.12.18

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [x] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [ ] No. I cannot contribute a bug fix at this time.

Jefffrey avatar Nov 30 '23 10:11 Jefffrey