pxf icon indicating copy to clipboard operation
pxf copied to clipboard

PXF hive/hdfs:parquet - unwanted timestamp conversion

Open akotuc opened this issue 2 years ago • 1 comments

Hello, I have a parquet file on HDFS with timestamp column and Hive table. If I query data via PXF Hive or PXF HDFS I get a different value for timestamp columns comparing to the direct hive query (some timezone shift).

PXF JVM is set to operate in UTC zone via -Duser.timezone parameter in pxf-env.sh. I can't change this setting to make sure other JDBC connections works correctly.

Question is why this unwanted conversion for PXF Hive/HDFS happens and if there is any way how to avoid it?

Thank you

Ales

Greenplum Version: 6.19.1 PXF version: 6.2.3 Hadoop: CDH 7.1.6 Hive versions: 3.1.3000.7.1.6.5-2 Parquet & hive table:

create external table adhoc.ts (col timestamp)
STORED AS PARQUET
LOCATION '/folder/ts';

insert into adhoc.ts (col) values (current_timestamp());

select * from adhoc.ts;
-- 2022-05-05 15:14:20.802

Greenplum:

create external table adhoc.ts_hive
    (
       col timestamp
        )
    location ('pxf://adhoc.ts?PROFILE=Hive&SERVER=hadoop')
    format 'custom' (formatter = 'pxfwritable_import');

select * from adhoc.ts_hive
-- 2022-05-05 13:14:20.802000

create external table adhoc.ak_ts_hdfs
    (
       col timestamp
        )
    location ('pxf://folder/ts?PROFILE=hdfs:parquet&SERVER=hadoop')
    format 'custom' (formatter = 'pxfwritable_import');

select * from adhoc.ts_hdfs
-- 2022-05-05 13:14:20.802000

pxf-env.sh:

#!/bin/bash

##############################################################################
# This file contains PXF properties that can be specified by users           #
# to customize their deployments. This file is sourced by PXF Server control #
# scripts upon initialization, start and stop of the PXF Server.             #
#                                                                            #
# To update a property, uncomment the line and provide a new value.          #
##############################################################################

# Path to JAVA
# export JAVA_HOME=/usr/java/default

# Path to Log directory
# export PXF_LOGDIR="${PXF_BASE}/logs"

# Path to Run directory
# export PXF_RUNDIR=${PXF_RUNDIR:=${PXF_BASE}/run}

# Memory
# export PXF_JVM_OPTS="-Xmx2g -Xms1g"

# Kill PXF on OutOfMemoryError, set to false to disable
# export PXF_OOM_KILL=true

# Dump heap on OutOfMemoryError, set to dump path to enable
# export PXF_OOM_DUMP_PATH=${PXF_BASE}/run/pxf_heap_dump

# Additional locations to be class-loaded by PXF
# export PXF_LOADER_PATH=

# Additional native libraries to be loaded by PXF
# export LD_LIBRARY_PATH=


######################################################
# The properties below were added by the pxf migrate
# tool on Fri Jun 18 17:50:49 CEST 2021
######################################################

export JAVA_HOME="/usr/lib/jvm/java-openjdk/jre"
export PXF_JVM_OPTS="-Xmx8g -Xms2g -Dlog4j2.formatMsgNoLookups=true -Duser.timezone=UTC"

hive-site.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<configuration>
  <property>
    <name>hive.metastore.uris</name>
    <value>thrift://server-004:9083,thrift://server-005:9083</value>
  </property>
  <property>
    <name>hive.metastore.client.socket.timeout</name>
    <value>300</value>
  </property>
  <property>
    <name>hive.metastore.warehouse.dir</name>
    <value>/warehouse/tablespace/managed/hive</value>
  </property>
  <property>
    <name>hive.metastore.warehouse.external.dir</name>
    <value>/warehouse/tablespace/external/hive</value>
  </property>
  <property>
    <name>hive.auto.convert.join</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.auto.convert.join.noconditionaltask.size</name>
    <value>52428800</value>
  </property>
  <property>
    <name>hive.optimize.bucketmapjoin.sortedmerge</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.smbjoin.cache.rows</name>
    <value>10000</value>
  </property>
  <property>
    <name>hive.server2.logging.operation.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.server2.logging.operation.log.location</name>
    <value>/var/log/hive/operation_logs</value>
  </property>
  <property>
    <name>mapred.reduce.tasks</name>
    <value>-1</value>
  </property>
  <property>
    <name>hive.exec.reducers.bytes.per.reducer</name>
    <value>67108864</value>
  </property>
  <property>
    <name>hive.exec.copyfile.maxsize</name>
    <value>33554432</value>
  </property>
  <property>
    <name>hive.exec.reducers.max</name>
    <value>1009</value>
  </property>
  <property>
    <name>hive.vectorized.groupby.checkinterval</name>
    <value>4096</value>
  </property>
  <property>
    <name>hive.vectorized.groupby.flush.percent</name>
    <value>0.1</value>
  </property>
  <property>
    <name>hive.compute.query.using.stats</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.vectorized.execution.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.vectorized.execution.reduce.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.vectorized.use.vectorized.input.format</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.vectorized.use.checked.expressions</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.vectorized.use.vector.serde.deserialize</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.vectorized.adaptor.usage.mode</name>
    <value>chosen</value>
  </property>
  <property>
    <name>hive.vectorized.input.format.excludes</name>
    <value></value>
  </property>
  <property>
    <name>hive.merge.mapfiles</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.merge.mapredfiles</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.cbo.enable</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.fetch.task.conversion</name>
    <value>more</value>
  </property>
  <property>
    <name>hive.fetch.task.conversion.threshold</name>
    <value>1073741824</value>
  </property>
  <property>
    <name>hive.limit.pushdown.memory.usage</name>
    <value>0.04</value>
  </property>
  <property>
    <name>hive.merge.smallfiles.avgsize</name>
    <value>16777216</value>
  </property>
  <property>
    <name>hive.merge.size.per.task</name>
    <value>268435456</value>
  </property>
  <property>
    <name>hive.optimize.reducededuplication</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.optimize.reducededuplication.min.reducer</name>
    <value>4</value>
  </property>
  <property>
    <name>hive.map.aggr</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.map.aggr.hash.percentmemory</name>
    <value>0.5</value>
  </property>
  <property>
    <name>hive.optimize.sort.dynamic.partition</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.metastore.execute.setugi</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.metastore.dml.events</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.support.concurrency</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.zookeeper.quorum</name>
    <value>server-003,server-005,server-004</value>
  </property>
  <property>
    <name>hive.zookeeper.client.port</name>
    <value>2181</value>
  </property>
  <property>
    <name>hive.cluster.delegation.token.store.class</name>
    <value>org.apache.hadoop.hive.thrift.DBTokenStore</value>
  </property>
  <property>
    <name>hive.server2.enable.doAs</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.metastore.sasl.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.server2.authentication</name>
    <value>kerberos</value>
  </property>
  <property>
    <name>hive.metastore.kerberos.principal</name>
    <value>hive/[email protected]</value>
  </property>
  <property>
    <name>spark.shuffle.service.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.exec.pre.hooks</name>
    <value>org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook</value>
  </property>
  <property>
    <name>hive.exec.failure.hooks</name>
    <value>org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook</value>
  </property>
  <property>
    <name>hive.strict.checks.orderby.no.limit</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.strict.checks.no.partition.filter</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.strict.checks.type.safety</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.strict.checks.cartesian.product</name>
    <value>false</value>
  </property>
  <property>
    <name>hive.strict.checks.bucketing</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.async.log.enabled</name>
    <value>false</value>
  </property>
  <property>
    <name>parquet.compression</name>
    <value>GZIP</value>
  </property>
  <property>
    <name>hive.metastore.integral.jdo.pushdown</name>
    <value>true</value>
  </property>
  <property>
    <name>hive.parquet.timestamp.skip.conversion</name>
    <value>true</value>
    <description>Current Hive implementation of parquet stores timestamps to UTC, this flag allows skipping of the conversionon reading parquet files from other tools</description>
  </property>
</configuration>

hdfs-site.xml

<?xml version="1.0" encoding="UTF-8"?>

<!--Autogenerated by Cloudera Manager-->
<configuration>
  <property>
    <name>dfs.nameservices</name>
    <value>nameservice1</value>
  </property>
  <property>
    <name>dfs.client.failover.proxy.provider.nameservice1</name>
    <value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
  </property>
  <property>
    <name>dfs.ha.automatic-failover.enabled.nameservice1</name>
    <value>true</value>
  </property>
  <property>
    <name>ha.zookeeper.quorum</name>
    <value>server-003:2181,server-004:2181,server-005:2181</value>
  </property>
  <property>
    <name>dfs.ha.namenodes.nameservice1</name>
    <value>namenode187,namenode169</value>
  </property>
  <property>
    <name>dfs.namenode.rpc-address.nameservice1.namenode187</name>
    <value>server-001:8020</value>
  </property>
  <property>
    <name>dfs.namenode.servicerpc-address.nameservice1.namenode187</name>
    <value>server-001:8022</value>
  </property>
  <property>
    <name>dfs.namenode.http-address.nameservice1.namenode187</name>
    <value>server-001:9870</value>
  </property>
  <property>
    <name>dfs.namenode.https-address.nameservice1.namenode187</name>
    <value>server-001:9871</value>
  </property>
  <property>
    <name>dfs.namenode.rpc-address.nameservice1.namenode169</name>
    <value>server-001:8020</value>
  </property>
  <property>
    <name>dfs.namenode.servicerpc-address.nameservice1.namenode169</name>
    <value>server-001:8022</value>
  </property>
  <property>
    <name>dfs.namenode.http-address.nameservice1.namenode169</name>
    <value>server-001:9870</value>
  </property>
  <property>
    <name>dfs.namenode.https-address.nameservice1.namenode169</name>
    <value>server-001:9871</value>
  </property>
  <property>
    <name>dfs.replication</name>
    <value>2</value>
  </property>
  <property>
    <name>dfs.blocksize</name>
    <value>134217728</value>
  </property>
  <property>
    <name>dfs.client.use.datanode.hostname</name>
    <value>false</value>
  </property>
  <property>
    <name>fs.permissions.umask-mode</name>
    <value>002</value>
  </property>
  <property>
    <name>dfs.client.block.write.locateFollowingBlock.retries</name>
    <value>7</value>
  </property>
  <property>
    <name>dfs.encrypt.data.transfer.algorithm</name>
    <value>3des</value>
  </property>
  <property>
    <name>dfs.encrypt.data.transfer.cipher.suites</name>
    <value>AES/CTR/NoPadding</value>
  </property>
  <property>
    <name>dfs.encrypt.data.transfer.cipher.key.bitlength</name>
    <value>256</value>
  </property>
  <property>
    <name>dfs.namenode.acls.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>dfs.client.read.shortcircuit</name>
    <value>true</value>
  </property>
  <property>
    <name>dfs.client.read.shortcircuit.streams.cache.size</name>
    <value>4096</value>
  </property>
  <property>
    <name>dfs.domain.socket.path</name>
    <value>/var/run/hdfs-sockets/dn</value>
  </property>
  <property>
    <name>dfs.client.read.shortcircuit.skip.checksum</name>
    <value>false</value>
  </property>
  <property>
    <name>dfs.client.domain.socket.data.traffic</name>
    <value>false</value>
  </property>
  <property>
    <name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
    <value>true</value>
  </property>
  <property>
    <name>dfs.block.access.token.enable</name>
    <value>true</value>
  </property>
  <property>
    <name>dfs.namenode.kerberos.principal</name>
    <value>hdfs/[email protected]</value>
  </property>
  <property>
    <name>dfs.namenode.kerberos.internal.spnego.principal</name>
    <value>HTTP/[email protected]</value>
  </property>
  <property>
    <name>dfs.datanode.kerberos.principal</name>
    <value>hdfs/[email protected]</value>
  </property>
</configuration>

akotuc avatar May 05 '22 13:05 akotuc

Apologies for the delay in responding to this issue. We've RCA'd the issue and wanted the share our findings, including a potential work around. It is on our roadmap to fix this, but it may be a while before we get to it.

Hive Setup

Hive DDL (NOTE: Hive server is in America/Los_Angeles time zone)

CREATE EXTERNAL TABLE event_times_in_hive (id int, event_time timestamp)
STORED AS PARQUET
LOCATION '/tmp/event_times_in_hive';

INSERT INTO event_times_in_hive (id, event_time) VALUES (1, '2022-05-10 12:34:56.789');

SELECT * FROM event_times_in_hive;
-- +-------------------------+---------------------------------+
-- | event_times_in_hive.id  | event_times_in_hive.event_time  |
-- +-------------------------+---------------------------------+
-- | 1                       | 2022-05-10 12:34:56.789         |
-- +-------------------------+---------------------------------+
-- 1 row selected (0.532 seconds)

Read the Data with Apache Spark

TZ='America/Chicago' $SPARK_HOME/bin/spark-shell
val eventTimes = spark.read.parquet("hdfs://localhost:8020/tmp/event_times_in_hive")
eventTimes.show(1, false)
// +---+-----------------------+
// |id |event_time             |
// +---+-----------------------+
// |1  |2022-05-10 14:34:56.789|
// +---+-----------------------+

The timestamp has been shifted two hours.

Read the Data with Greenplum PXF

Greenplum DDL

NOTE: PXF server is configured with export PXF_JVM_OPTS="-Xmx2g -Xms1g -Duser.timezone=America/Chicago"

CREATE EXTERNAL TABLE pxf_event_times_in_hive (
    id         int,
    event_time timestamp
) LOCATION ('pxf://event_times_in_hive?PROFILE=hive')
FORMAT 'custom' (FORMATTER='pxfwritable_import');

SELECT * FROM pxf_event_times_in_hive ;
--  id |       event_time
-- ----+-------------------------
--   1 | 2022-05-10 14:34:56.789
-- (1 row)

CREATE EXTERNAL TABLE pxf_hdfs_event_times_in_hive (
    id         int,
    event_time timestamp
) LOCATION ('pxf://tmp/event_times_in_hive?PROFILE=hdfs:parquet')
FORMAT 'custom' (FORMATTER='pxfwritable_import');

SELECT * FROM pxf_hdfs_event_times_in_hive ;
--  id |       event_time
-- ----+-------------------------
--   1 | 2022-05-10 14:34:56.789
-- (1 row)

Discussion

This appears to be a known issue with how Hive writes timestamps in Parquet files. Timestamps are normalized from the Hive server's local time zone to UTC and stored using INT96. As an example, the timestamp inserted into the Hive table (2022-05-10 12:34:56.789) is converted to the instant 2022-05-10 12:34:56.789 America/Los_Angeles which is the same instant as 2022-05-10 19:34:56.789 UTC and the number of milliseconds since epoch (1970-01-01 00:00:00.000 UTC) are stored in the Parquet file. In older versions of Hive (before v3.1), Hive would normalize the timestamp from UTC back to the Hive server's local time zone. Hive 3.1 turned off normalization to UTC but this was later changed in Hive 3.1.2 (see HIVE-21290).

  • timestamps are normalized to UTC before storing in Parquet file and the session-local local time zone is stored in the file metadata
  • when reading back files, timestamps are converted back from UTC to the saved time zone (instead of the local time zone)

This can be see by using parquet-tools to inspect the metadata of the Parquet files (see the writer.time.zone)

$ hadoop jar ~/pxf-data/parquet-tools-1.11.2.jar meta /tmp/event_times_in_hive
file:        hdfs://0.0.0.0:8020/tmp/event_times_in_hive/000000_0
creator:     parquet-mr version 1.10.0.3.1.4.0-315 (build c67156e3dd3ff2d37eec781ee56b884c3dd894c1)
extra:       writer.time.zone = America/Los_Angeles

file schema: hive_schema
--------------------------------------------------------------------------------
id:          OPTIONAL INT32 R:0 D:1
event_time:  OPTIONAL INT96 R:0 D:1

row group 1: RC:1 TS:165 OFFSET:4
--------------------------------------------------------------------------------
id:           INT32 UNCOMPRESSED DO:0 FPO:4 SZ:55/55/1.00 VC:1 ENC:BIT_PACKED,PLAIN,RLE ST:[min: 1, max: 1, num_nulls: 0]
event_time:   INT96 UNCOMPRESSED DO:0 FPO:59 SZ:110/110/1.00 VC:1 ENC:PLAIN_DICTIONARY,BIT_PACKED,RLE ST:[min: 0x40EF10D01D4000003E882500, max: 0x40EF10D01D4000003E882500, num_nulls: 0]

The stored value for event_time is 0x40EF10D01D4000003E882500 which is the INT96 representation of 2022-05-10 12:34:56.789. This represents 70496789000000 nanoseconds (first 8-bytes) into Julia day 2459710 (remaining 4-bytes). Julian day 2459710 corresponds to 2022-05-10 while 70496789000000 comes out to 19:34:56.789. Treating this as an instant in UTC timezone, we have 2022-05-10 19:34:56.789 UTC which is 2022-05-10 12:34:56.789 America/Los_Angeles. Dropping the America/Los_Angeles timezone, we recover the originally inserted timestamp.

If we inspect the metadata of a Parquet file written with an older version of Hive (e.g., 1.2.1), we see that the writer's time zone is not recorded in the Parquet file

$ ./bin/hadoop jar ~/pxf-data/parquet-tools-1.11.2.jar meta /tmp/event_times_in_hive
22/05/11 12:46:27 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
22/05/11 12:46:27 INFO hadoop.ParquetFileReader: reading another 1 footers
22/05/11 12:46:27 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
file:        hdfs://0.0.0.0:8020/tmp/event_times_in_hive/000000_0
creator:     parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)

file schema: hive_schema
--------------------------------------------------------------------------------
id:          OPTIONAL INT32 R:0 D:1
event_time:  OPTIONAL INT96 R:0 D:1

row group 1: RC:1 TS:125 OFFSET:4
--------------------------------------------------------------------------------
id:           INT32 UNCOMPRESSED DO:0 FPO:4 SZ:43/43/1.00 VC:1 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 1, max: 1, num_nulls: 0]
event_time:   INT96 UNCOMPRESSED DO:0 FPO:47 SZ:82/82/1.00 VC:1 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0x40EF10D01D4000003E882500, max: 0x40EF10D01D4000003E882500, num_nulls: 0]

What Metadata Does Spark Include in Parquet Files?

export HADOOP_CONF_DIR="${HOME}/workspace/singlecluster-HDP3/hadoop/etc/hadoop"
$SPARK_HOME/bin/spark-shell
println(java.time.ZoneId.systemDefault())
// America/Los_Angeles

val eventTime = new java.sql.Timestamp(1652211296789L)
println(eventTime)
// 2022-05-10 12:34:56.789

val df = Seq((1, eventTime)).toDF("id", "event_time")
df.write.parquet("hdfs://localhost:8020/tmp/spark-3.0.3")
$ hadoop jar ./parquet-tools-1.11.2.jar meta /tmp/spark-3.0.3
file:        hdfs://0.0.0.0:8020/tmp/spark-3.0.3/part-00000-c73dbb93-dce2-4721-b71c-e45bfcd834e9-c000.snappy.parquet
creator:     parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra:       org.apache.spark.version = 3.0.3
extra:       org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"event_time","type":"timestamp","nullable":true,"metadata":{}}]}

file schema: spark_schema
--------------------------------------------------------------------------------
id:          REQUIRED INT32 R:0 D:0
event_time:  OPTIONAL INT96 R:0 D:1

row group 1: RC:1 TS:159 OFFSET:4
--------------------------------------------------------------------------------
id:           INT32 SNAPPY DO:0 FPO:4 SZ:51/49/0.96 VC:1 ENC:BIT_PACKED,PLAIN ST:[min: 1, max: 1, num_nulls: 0]
event_time:   INT96 SNAPPY DO:0 FPO:55 SZ:114/110/0.96 VC:1 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: 0x40EF10D01D4000003E882500, max: 0x40EF10D01D4000003E882500, num_nulls: 0]

Here, Spark was used to write a Parquet file with the same data as the Hive example and was running with the same timezone as the Hive server (America/Los_Angeles). From the parquet-tools output, we observe two things:

  1. The stored INT96 value (0x40EF10D01D4000003E882500) is exactly the same as what was stored in the Parquet file written by Hive
  2. The writer's timezone was not recorded in the file metadata

This example was for Spark 3.0.3, but the same result holds for Spark 3.2.1 (which is the latest release of Apache Spark as of 2022-05-13).

What Metadata Does PXF Write in Parquet Files?

We can look at what metadata (if any) is included in the Parquet files written by PXF; for this example, the PXF server is running in America/Los_Angeles time zone.

CREATE WRITABLE EXTERNAL TABLE pxf_hdfs_parquet_event_times (
    id         int,
    event_time timestamp
) LOCATION ('pxf://tmp/pxf_event_times?PROFILE=hdfs:parquet')
FORMAT 'custom' (FORMATTER='pxfwritable_export');

INSERT INTO pxf_hdfs_parquet_event_times (id, event_time) VALUES (1, '2022-05-10 12:34:56.789');

Inspecting the written Parquet file with parquet-tools

$ hadoop jar ~/pxf-data/parquet-tools-1.11.2.jar meta /tmp/pxf_event_times
22/05/11 14:18:28 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
22/05/11 14:18:28 INFO hadoop.ParquetFileReader: reading another 1 footers
22/05/11 14:18:28 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
file:        hdfs://0.0.0.0:8020/tmp/pxf_event_times/23-0000000002_0.snappy.parquet
creator:     parquet-mr version 1.11.1 (build 765bd5cd7fdef2af1cecd0755000694b992bfadd)
extra:       writer.model.name = example

file schema: hive_schema
--------------------------------------------------------------------------------
id:          OPTIONAL INT32 R:0 D:1
event_time:  OPTIONAL INT96 R:0 D:1

row group 1: RC:1 TS:95 OFFSET:4
--------------------------------------------------------------------------------
id:           INT32 SNAPPY DO:0 FPO:4 SZ:35/33/0.94 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 1, num_nulls: 0]
event_time:   INT96 SNAPPY DO:0 FPO:39 SZ:66/62/0.94 VC:1 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[min: 0x40EF10D01D4000003E882500, max: 0x40EF10D01D4000003E882500, num_nulls: 0]

We see that PXF is normalizing from the local system's time zone to UTC (the hexadecimal INT96 value is the same what is written by Hive) but the session-local local time zone is not stored in the file metadata.

Summary

When using Parquet files, older versions of Hive treated timestamps with instant semantics. This is what Apache Spark and Greenplum PXF currently do as well. In newer versions of Hive (3.1.2), timestamps are treated with local date time semantics with the writer's time zone recorded in the file's metadata in order to maintain backwards compatibility.

Possible Workaround

Use AT TIME ZONE operator to add the time zone of the PXF server to the timestamp column and then use the operator again to convert back to a timestamp without a time zone, as the time would appear in that time zone. This should work when reading Parquet files with either the hive profile or the hdfs:parquet file, but it requires that you know the time zone of the writer of the Parquet files and that all files were written in the same time zone. Here is an example:

-- PXF & GPDB server are running in America/Chicago time zone
-- Hive server is America/Los_Angeles
SELECT event_time FROM pxf_event_times_in_hive ;
--        event_time
-- -------------------------
--  2022-05-10 14:34:56.789
-- (1 row)

SELECT (event_time AT TIME ZONE 'America/Chicago') AT TIME ZONE 'America/Los_Angeles' AS event_time FROM pxf_event_times_in_hive;
--        event_time
-- -------------------------
--  2022-05-10 12:34:56.789
-- (1 row)

Appendix

Spark Configuration for INT96 Timestamp Conversion

Spark added spark.sql.parquet.int96TimestampConversion SPARK-12297

This controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different time zone offset than Hive & Spark

Hive Configuration for Skipping Timestamp Conversion

Hive added hive.parquet.timestamp.skip.conversion HIVE-9482

Current Hive implementation of parquet stores timestamps to UTC, this flag allows skipping of the conversion on reading parquet files from other tools

PXF Code Path

When using the hive profile, we end up in NanoTimeUtils.getTimestamp

  • in Hive 2.3.8, we don't take into account the writer's time zone
  • HIVE-21290

When using the hdfs:parquet profile, we read the 12-bytes that make up the INT96 timestamp and convert that to a timestamp with ParquetTypeConverter.bytesToTimestamp were we shift according to PXF's local time zone.

See the following Parquet technical paper Consistent timestamp types in Hadoop SQL engines

bradfordb-vmware avatar May 19 '22 22:05 bradfordb-vmware