pxf
pxf copied to clipboard
PXF hive/hdfs:parquet - unwanted timestamp conversion
Hello, I have a parquet file on HDFS with timestamp column and Hive table. If I query data via PXF Hive or PXF HDFS I get a different value for timestamp columns comparing to the direct hive query (some timezone shift).
PXF JVM is set to operate in UTC zone via -Duser.timezone parameter in pxf-env.sh. I can't change this setting to make sure other JDBC connections works correctly.
Question is why this unwanted conversion for PXF Hive/HDFS happens and if there is any way how to avoid it?
Thank you
Ales
Greenplum Version: 6.19.1 PXF version: 6.2.3 Hadoop: CDH 7.1.6 Hive versions: 3.1.3000.7.1.6.5-2 Parquet & hive table:
create external table adhoc.ts (col timestamp)
STORED AS PARQUET
LOCATION '/folder/ts';
insert into adhoc.ts (col) values (current_timestamp());
select * from adhoc.ts;
-- 2022-05-05 15:14:20.802
Greenplum:
create external table adhoc.ts_hive
(
col timestamp
)
location ('pxf://adhoc.ts?PROFILE=Hive&SERVER=hadoop')
format 'custom' (formatter = 'pxfwritable_import');
select * from adhoc.ts_hive
-- 2022-05-05 13:14:20.802000
create external table adhoc.ak_ts_hdfs
(
col timestamp
)
location ('pxf://folder/ts?PROFILE=hdfs:parquet&SERVER=hadoop')
format 'custom' (formatter = 'pxfwritable_import');
select * from adhoc.ts_hdfs
-- 2022-05-05 13:14:20.802000
pxf-env.sh:
#!/bin/bash
##############################################################################
# This file contains PXF properties that can be specified by users #
# to customize their deployments. This file is sourced by PXF Server control #
# scripts upon initialization, start and stop of the PXF Server. #
# #
# To update a property, uncomment the line and provide a new value. #
##############################################################################
# Path to JAVA
# export JAVA_HOME=/usr/java/default
# Path to Log directory
# export PXF_LOGDIR="${PXF_BASE}/logs"
# Path to Run directory
# export PXF_RUNDIR=${PXF_RUNDIR:=${PXF_BASE}/run}
# Memory
# export PXF_JVM_OPTS="-Xmx2g -Xms1g"
# Kill PXF on OutOfMemoryError, set to false to disable
# export PXF_OOM_KILL=true
# Dump heap on OutOfMemoryError, set to dump path to enable
# export PXF_OOM_DUMP_PATH=${PXF_BASE}/run/pxf_heap_dump
# Additional locations to be class-loaded by PXF
# export PXF_LOADER_PATH=
# Additional native libraries to be loaded by PXF
# export LD_LIBRARY_PATH=
######################################################
# The properties below were added by the pxf migrate
# tool on Fri Jun 18 17:50:49 CEST 2021
######################################################
export JAVA_HOME="/usr/lib/jvm/java-openjdk/jre"
export PXF_JVM_OPTS="-Xmx8g -Xms2g -Dlog4j2.formatMsgNoLookups=true -Duser.timezone=UTC"
hive-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>hive.metastore.uris</name>
<value>thrift://server-004:9083,thrift://server-005:9083</value>
</property>
<property>
<name>hive.metastore.client.socket.timeout</name>
<value>300</value>
</property>
<property>
<name>hive.metastore.warehouse.dir</name>
<value>/warehouse/tablespace/managed/hive</value>
</property>
<property>
<name>hive.metastore.warehouse.external.dir</name>
<value>/warehouse/tablespace/external/hive</value>
</property>
<property>
<name>hive.auto.convert.join</name>
<value>true</value>
</property>
<property>
<name>hive.auto.convert.join.noconditionaltask.size</name>
<value>52428800</value>
</property>
<property>
<name>hive.optimize.bucketmapjoin.sortedmerge</name>
<value>false</value>
</property>
<property>
<name>hive.smbjoin.cache.rows</name>
<value>10000</value>
</property>
<property>
<name>hive.server2.logging.operation.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.server2.logging.operation.log.location</name>
<value>/var/log/hive/operation_logs</value>
</property>
<property>
<name>mapred.reduce.tasks</name>
<value>-1</value>
</property>
<property>
<name>hive.exec.reducers.bytes.per.reducer</name>
<value>67108864</value>
</property>
<property>
<name>hive.exec.copyfile.maxsize</name>
<value>33554432</value>
</property>
<property>
<name>hive.exec.reducers.max</name>
<value>1009</value>
</property>
<property>
<name>hive.vectorized.groupby.checkinterval</name>
<value>4096</value>
</property>
<property>
<name>hive.vectorized.groupby.flush.percent</name>
<value>0.1</value>
</property>
<property>
<name>hive.compute.query.using.stats</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.execution.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.execution.reduce.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.use.vectorized.input.format</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.use.checked.expressions</name>
<value>true</value>
</property>
<property>
<name>hive.vectorized.use.vector.serde.deserialize</name>
<value>false</value>
</property>
<property>
<name>hive.vectorized.adaptor.usage.mode</name>
<value>chosen</value>
</property>
<property>
<name>hive.vectorized.input.format.excludes</name>
<value></value>
</property>
<property>
<name>hive.merge.mapfiles</name>
<value>true</value>
</property>
<property>
<name>hive.merge.mapredfiles</name>
<value>false</value>
</property>
<property>
<name>hive.cbo.enable</name>
<value>true</value>
</property>
<property>
<name>hive.fetch.task.conversion</name>
<value>more</value>
</property>
<property>
<name>hive.fetch.task.conversion.threshold</name>
<value>1073741824</value>
</property>
<property>
<name>hive.limit.pushdown.memory.usage</name>
<value>0.04</value>
</property>
<property>
<name>hive.merge.smallfiles.avgsize</name>
<value>16777216</value>
</property>
<property>
<name>hive.merge.size.per.task</name>
<value>268435456</value>
</property>
<property>
<name>hive.optimize.reducededuplication</name>
<value>true</value>
</property>
<property>
<name>hive.optimize.reducededuplication.min.reducer</name>
<value>4</value>
</property>
<property>
<name>hive.map.aggr</name>
<value>true</value>
</property>
<property>
<name>hive.map.aggr.hash.percentmemory</name>
<value>0.5</value>
</property>
<property>
<name>hive.optimize.sort.dynamic.partition</name>
<value>false</value>
</property>
<property>
<name>hive.metastore.execute.setugi</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.dml.events</name>
<value>true</value>
</property>
<property>
<name>hive.support.concurrency</name>
<value>true</value>
</property>
<property>
<name>hive.zookeeper.quorum</name>
<value>server-003,server-005,server-004</value>
</property>
<property>
<name>hive.zookeeper.client.port</name>
<value>2181</value>
</property>
<property>
<name>hive.cluster.delegation.token.store.class</name>
<value>org.apache.hadoop.hive.thrift.DBTokenStore</value>
</property>
<property>
<name>hive.server2.enable.doAs</name>
<value>true</value>
</property>
<property>
<name>hive.metastore.sasl.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.server2.authentication</name>
<value>kerberos</value>
</property>
<property>
<name>hive.metastore.kerberos.principal</name>
<value>hive/[email protected]</value>
</property>
<property>
<name>spark.shuffle.service.enabled</name>
<value>true</value>
</property>
<property>
<name>hive.exec.pre.hooks</name>
<value>org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook</value>
</property>
<property>
<name>hive.exec.failure.hooks</name>
<value>org.apache.hadoop.hive.ql.hooks.HiveProtoLoggingHook</value>
</property>
<property>
<name>hive.strict.checks.orderby.no.limit</name>
<value>false</value>
</property>
<property>
<name>hive.strict.checks.no.partition.filter</name>
<value>false</value>
</property>
<property>
<name>hive.strict.checks.type.safety</name>
<value>true</value>
</property>
<property>
<name>hive.strict.checks.cartesian.product</name>
<value>false</value>
</property>
<property>
<name>hive.strict.checks.bucketing</name>
<value>true</value>
</property>
<property>
<name>hive.async.log.enabled</name>
<value>false</value>
</property>
<property>
<name>parquet.compression</name>
<value>GZIP</value>
</property>
<property>
<name>hive.metastore.integral.jdo.pushdown</name>
<value>true</value>
</property>
<property>
<name>hive.parquet.timestamp.skip.conversion</name>
<value>true</value>
<description>Current Hive implementation of parquet stores timestamps to UTC, this flag allows skipping of the conversionon reading parquet files from other tools</description>
</property>
</configuration>
hdfs-site.xml
<?xml version="1.0" encoding="UTF-8"?>
<!--Autogenerated by Cloudera Manager-->
<configuration>
<property>
<name>dfs.nameservices</name>
<value>nameservice1</value>
</property>
<property>
<name>dfs.client.failover.proxy.provider.nameservice1</name>
<value>org.apache.hadoop.hdfs.server.namenode.ha.ConfiguredFailoverProxyProvider</value>
</property>
<property>
<name>dfs.ha.automatic-failover.enabled.nameservice1</name>
<value>true</value>
</property>
<property>
<name>ha.zookeeper.quorum</name>
<value>server-003:2181,server-004:2181,server-005:2181</value>
</property>
<property>
<name>dfs.ha.namenodes.nameservice1</name>
<value>namenode187,namenode169</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode187</name>
<value>server-001:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode187</name>
<value>server-001:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.nameservice1.namenode187</name>
<value>server-001:9870</value>
</property>
<property>
<name>dfs.namenode.https-address.nameservice1.namenode187</name>
<value>server-001:9871</value>
</property>
<property>
<name>dfs.namenode.rpc-address.nameservice1.namenode169</name>
<value>server-001:8020</value>
</property>
<property>
<name>dfs.namenode.servicerpc-address.nameservice1.namenode169</name>
<value>server-001:8022</value>
</property>
<property>
<name>dfs.namenode.http-address.nameservice1.namenode169</name>
<value>server-001:9870</value>
</property>
<property>
<name>dfs.namenode.https-address.nameservice1.namenode169</name>
<value>server-001:9871</value>
</property>
<property>
<name>dfs.replication</name>
<value>2</value>
</property>
<property>
<name>dfs.blocksize</name>
<value>134217728</value>
</property>
<property>
<name>dfs.client.use.datanode.hostname</name>
<value>false</value>
</property>
<property>
<name>fs.permissions.umask-mode</name>
<value>002</value>
</property>
<property>
<name>dfs.client.block.write.locateFollowingBlock.retries</name>
<value>7</value>
</property>
<property>
<name>dfs.encrypt.data.transfer.algorithm</name>
<value>3des</value>
</property>
<property>
<name>dfs.encrypt.data.transfer.cipher.suites</name>
<value>AES/CTR/NoPadding</value>
</property>
<property>
<name>dfs.encrypt.data.transfer.cipher.key.bitlength</name>
<value>256</value>
</property>
<property>
<name>dfs.namenode.acls.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.client.read.shortcircuit</name>
<value>true</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.streams.cache.size</name>
<value>4096</value>
</property>
<property>
<name>dfs.domain.socket.path</name>
<value>/var/run/hdfs-sockets/dn</value>
</property>
<property>
<name>dfs.client.read.shortcircuit.skip.checksum</name>
<value>false</value>
</property>
<property>
<name>dfs.client.domain.socket.data.traffic</name>
<value>false</value>
</property>
<property>
<name>dfs.datanode.hdfs-blocks-metadata.enabled</name>
<value>true</value>
</property>
<property>
<name>dfs.block.access.token.enable</name>
<value>true</value>
</property>
<property>
<name>dfs.namenode.kerberos.principal</name>
<value>hdfs/[email protected]</value>
</property>
<property>
<name>dfs.namenode.kerberos.internal.spnego.principal</name>
<value>HTTP/[email protected]</value>
</property>
<property>
<name>dfs.datanode.kerberos.principal</name>
<value>hdfs/[email protected]</value>
</property>
</configuration>
Apologies for the delay in responding to this issue. We've RCA'd the issue and wanted the share our findings, including a potential work around. It is on our roadmap to fix this, but it may be a while before we get to it.
Hive Setup
Hive DDL (NOTE: Hive server is in America/Los_Angeles
time zone)
CREATE EXTERNAL TABLE event_times_in_hive (id int, event_time timestamp)
STORED AS PARQUET
LOCATION '/tmp/event_times_in_hive';
INSERT INTO event_times_in_hive (id, event_time) VALUES (1, '2022-05-10 12:34:56.789');
SELECT * FROM event_times_in_hive;
-- +-------------------------+---------------------------------+
-- | event_times_in_hive.id | event_times_in_hive.event_time |
-- +-------------------------+---------------------------------+
-- | 1 | 2022-05-10 12:34:56.789 |
-- +-------------------------+---------------------------------+
-- 1 row selected (0.532 seconds)
Read the Data with Apache Spark
TZ='America/Chicago' $SPARK_HOME/bin/spark-shell
val eventTimes = spark.read.parquet("hdfs://localhost:8020/tmp/event_times_in_hive")
eventTimes.show(1, false)
// +---+-----------------------+
// |id |event_time |
// +---+-----------------------+
// |1 |2022-05-10 14:34:56.789|
// +---+-----------------------+
The timestamp has been shifted two hours.
Read the Data with Greenplum PXF
Greenplum DDL
NOTE: PXF server is configured with export PXF_JVM_OPTS="-Xmx2g -Xms1g -Duser.timezone=America/Chicago"
CREATE EXTERNAL TABLE pxf_event_times_in_hive (
id int,
event_time timestamp
) LOCATION ('pxf://event_times_in_hive?PROFILE=hive')
FORMAT 'custom' (FORMATTER='pxfwritable_import');
SELECT * FROM pxf_event_times_in_hive ;
-- id | event_time
-- ----+-------------------------
-- 1 | 2022-05-10 14:34:56.789
-- (1 row)
CREATE EXTERNAL TABLE pxf_hdfs_event_times_in_hive (
id int,
event_time timestamp
) LOCATION ('pxf://tmp/event_times_in_hive?PROFILE=hdfs:parquet')
FORMAT 'custom' (FORMATTER='pxfwritable_import');
SELECT * FROM pxf_hdfs_event_times_in_hive ;
-- id | event_time
-- ----+-------------------------
-- 1 | 2022-05-10 14:34:56.789
-- (1 row)
Discussion
This appears to be a known issue with how Hive writes timestamps in Parquet files. Timestamps are normalized from the Hive server's local time zone to UTC and stored using INT96. As an example, the timestamp inserted into the Hive table (2022-05-10 12:34:56.789
) is converted to the instant 2022-05-10 12:34:56.789 America/Los_Angeles
which is the same instant as 2022-05-10 19:34:56.789 UTC
and the number of milliseconds since epoch (1970-01-01 00:00:00.000 UTC
) are stored in the Parquet file. In older versions of Hive (before v3.1), Hive would normalize the timestamp from UTC back to the Hive server's local time zone. Hive 3.1 turned off normalization to UTC but this was later changed in Hive 3.1.2 (see HIVE-21290).
- timestamps are normalized to UTC before storing in Parquet file and the session-local local time zone is stored in the file metadata
- when reading back files, timestamps are converted back from UTC to the saved time zone (instead of the local time zone)
This can be see by using parquet-tools to inspect the metadata of the Parquet files (see the writer.time.zone
)
$ hadoop jar ~/pxf-data/parquet-tools-1.11.2.jar meta /tmp/event_times_in_hive
file: hdfs://0.0.0.0:8020/tmp/event_times_in_hive/000000_0
creator: parquet-mr version 1.10.0.3.1.4.0-315 (build c67156e3dd3ff2d37eec781ee56b884c3dd894c1)
extra: writer.time.zone = America/Los_Angeles
file schema: hive_schema
--------------------------------------------------------------------------------
id: OPTIONAL INT32 R:0 D:1
event_time: OPTIONAL INT96 R:0 D:1
row group 1: RC:1 TS:165 OFFSET:4
--------------------------------------------------------------------------------
id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:55/55/1.00 VC:1 ENC:BIT_PACKED,PLAIN,RLE ST:[min: 1, max: 1, num_nulls: 0]
event_time: INT96 UNCOMPRESSED DO:0 FPO:59 SZ:110/110/1.00 VC:1 ENC:PLAIN_DICTIONARY,BIT_PACKED,RLE ST:[min: 0x40EF10D01D4000003E882500, max: 0x40EF10D01D4000003E882500, num_nulls: 0]
The stored value for event_time
is 0x40EF10D01D4000003E882500
which is the INT96 representation of 2022-05-10 12:34:56.789
. This represents 70496789000000
nanoseconds (first 8-bytes) into Julia day 2459710
(remaining 4-bytes).
Julian day 2459710
corresponds to 2022-05-10
while 70496789000000
comes out to 19:34:56.789
.
Treating this as an instant in UTC timezone, we have 2022-05-10 19:34:56.789 UTC
which is 2022-05-10 12:34:56.789 America/Los_Angeles
. Dropping the America/Los_Angeles
timezone, we recover the originally inserted timestamp.
If we inspect the metadata of a Parquet file written with an older version of Hive (e.g., 1.2.1), we see that the writer's time zone is not recorded in the Parquet file
$ ./bin/hadoop jar ~/pxf-data/parquet-tools-1.11.2.jar meta /tmp/event_times_in_hive
22/05/11 12:46:27 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
22/05/11 12:46:27 INFO hadoop.ParquetFileReader: reading another 1 footers
22/05/11 12:46:27 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
file: hdfs://0.0.0.0:8020/tmp/event_times_in_hive/000000_0
creator: parquet-mr version 1.8.1 (build 4aba4dae7bb0d4edbcf7923ae1339f28fd3f7fcf)
file schema: hive_schema
--------------------------------------------------------------------------------
id: OPTIONAL INT32 R:0 D:1
event_time: OPTIONAL INT96 R:0 D:1
row group 1: RC:1 TS:125 OFFSET:4
--------------------------------------------------------------------------------
id: INT32 UNCOMPRESSED DO:0 FPO:4 SZ:43/43/1.00 VC:1 ENC:PLAIN,RLE,BIT_PACKED ST:[min: 1, max: 1, num_nulls: 0]
event_time: INT96 UNCOMPRESSED DO:0 FPO:47 SZ:82/82/1.00 VC:1 ENC:RLE,PLAIN_DICTIONARY,BIT_PACKED ST:[min: 0x40EF10D01D4000003E882500, max: 0x40EF10D01D4000003E882500, num_nulls: 0]
What Metadata Does Spark Include in Parquet Files?
export HADOOP_CONF_DIR="${HOME}/workspace/singlecluster-HDP3/hadoop/etc/hadoop"
$SPARK_HOME/bin/spark-shell
println(java.time.ZoneId.systemDefault())
// America/Los_Angeles
val eventTime = new java.sql.Timestamp(1652211296789L)
println(eventTime)
// 2022-05-10 12:34:56.789
val df = Seq((1, eventTime)).toDF("id", "event_time")
df.write.parquet("hdfs://localhost:8020/tmp/spark-3.0.3")
$ hadoop jar ./parquet-tools-1.11.2.jar meta /tmp/spark-3.0.3
file: hdfs://0.0.0.0:8020/tmp/spark-3.0.3/part-00000-c73dbb93-dce2-4721-b71c-e45bfcd834e9-c000.snappy.parquet
creator: parquet-mr version 1.10.1 (build a89df8f9932b6ef6633d06069e50c9b7970bebd1)
extra: org.apache.spark.version = 3.0.3
extra: org.apache.spark.sql.parquet.row.metadata = {"type":"struct","fields":[{"name":"id","type":"integer","nullable":false,"metadata":{}},{"name":"event_time","type":"timestamp","nullable":true,"metadata":{}}]}
file schema: spark_schema
--------------------------------------------------------------------------------
id: REQUIRED INT32 R:0 D:0
event_time: OPTIONAL INT96 R:0 D:1
row group 1: RC:1 TS:159 OFFSET:4
--------------------------------------------------------------------------------
id: INT32 SNAPPY DO:0 FPO:4 SZ:51/49/0.96 VC:1 ENC:BIT_PACKED,PLAIN ST:[min: 1, max: 1, num_nulls: 0]
event_time: INT96 SNAPPY DO:0 FPO:55 SZ:114/110/0.96 VC:1 ENC:RLE,BIT_PACKED,PLAIN_DICTIONARY ST:[min: 0x40EF10D01D4000003E882500, max: 0x40EF10D01D4000003E882500, num_nulls: 0]
Here, Spark was used to write a Parquet file with the same data as the Hive example and was running with the same timezone as the Hive server (America/Los_Angeles
). From the parquet-tools output, we observe two things:
- The stored INT96 value (
0x40EF10D01D4000003E882500
) is exactly the same as what was stored in the Parquet file written by Hive - The writer's timezone was not recorded in the file metadata
This example was for Spark 3.0.3, but the same result holds for Spark 3.2.1 (which is the latest release of Apache Spark as of 2022-05-13).
What Metadata Does PXF Write in Parquet Files?
We can look at what metadata (if any) is included in the Parquet files written by PXF; for this example, the PXF server is running in America/Los_Angeles
time zone.
CREATE WRITABLE EXTERNAL TABLE pxf_hdfs_parquet_event_times (
id int,
event_time timestamp
) LOCATION ('pxf://tmp/pxf_event_times?PROFILE=hdfs:parquet')
FORMAT 'custom' (FORMATTER='pxfwritable_export');
INSERT INTO pxf_hdfs_parquet_event_times (id, event_time) VALUES (1, '2022-05-10 12:34:56.789');
Inspecting the written Parquet file with parquet-tools
$ hadoop jar ~/pxf-data/parquet-tools-1.11.2.jar meta /tmp/pxf_event_times
22/05/11 14:18:28 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
22/05/11 14:18:28 INFO hadoop.ParquetFileReader: reading another 1 footers
22/05/11 14:18:28 INFO hadoop.ParquetFileReader: Initiating action with parallelism: 5
file: hdfs://0.0.0.0:8020/tmp/pxf_event_times/23-0000000002_0.snappy.parquet
creator: parquet-mr version 1.11.1 (build 765bd5cd7fdef2af1cecd0755000694b992bfadd)
extra: writer.model.name = example
file schema: hive_schema
--------------------------------------------------------------------------------
id: OPTIONAL INT32 R:0 D:1
event_time: OPTIONAL INT96 R:0 D:1
row group 1: RC:1 TS:95 OFFSET:4
--------------------------------------------------------------------------------
id: INT32 SNAPPY DO:0 FPO:4 SZ:35/33/0.94 VC:1 ENC:BIT_PACKED,RLE,PLAIN ST:[min: 1, max: 1, num_nulls: 0]
event_time: INT96 SNAPPY DO:0 FPO:39 SZ:66/62/0.94 VC:1 ENC:BIT_PACKED,RLE,PLAIN_DICTIONARY ST:[min: 0x40EF10D01D4000003E882500, max: 0x40EF10D01D4000003E882500, num_nulls: 0]
We see that PXF is normalizing from the local system's time zone to UTC (the hexadecimal INT96 value is the same what is written by Hive) but the session-local local time zone is not stored in the file metadata.
Summary
When using Parquet files, older versions of Hive treated timestamps with instant semantics. This is what Apache Spark and Greenplum PXF currently do as well. In newer versions of Hive (3.1.2), timestamps are treated with local date time semantics with the writer's time zone recorded in the file's metadata in order to maintain backwards compatibility.
Possible Workaround
Use AT TIME ZONE
operator to add the time zone of the PXF server to the timestamp column and then use the operator again to convert back to a timestamp without a time zone, as the time would appear in that time zone. This should work when reading Parquet files with either the hive
profile or the hdfs:parquet
file, but it requires that you know the time zone of the writer of the Parquet files and that all files were written in the same time zone. Here is an example:
-- PXF & GPDB server are running in America/Chicago time zone
-- Hive server is America/Los_Angeles
SELECT event_time FROM pxf_event_times_in_hive ;
-- event_time
-- -------------------------
-- 2022-05-10 14:34:56.789
-- (1 row)
SELECT (event_time AT TIME ZONE 'America/Chicago') AT TIME ZONE 'America/Los_Angeles' AS event_time FROM pxf_event_times_in_hive;
-- event_time
-- -------------------------
-- 2022-05-10 12:34:56.789
-- (1 row)
Appendix
Spark Configuration for INT96 Timestamp Conversion
Spark added spark.sql.parquet.int96TimestampConversion
SPARK-12297
This controls whether timestamp adjustments should be applied to INT96 data when converting to timestamps, for data written by Impala. This is necessary because Impala stores INT96 data with a different time zone offset than Hive & Spark
-
SQLConf.PARQUET_INT96_TIMESTAMP_CONVERSION
-
SQLConf.isParquetINT96TimestampConversion
-
ParquetFileFormat
Hive Configuration for Skipping Timestamp Conversion
Hive added hive.parquet.timestamp.skip.conversion
HIVE-9482
Current Hive implementation of parquet stores timestamps to UTC, this flag allows skipping of the conversion on reading parquet files from other tools
-
ParquetRecordReaderBase
- conversion is skipped only if the Parquet file was written by something other than
parquet-mr
- conversion is skipped only if the Parquet file was written by something other than
- Hive Configuration Properties
PXF Code Path
When using the hive
profile, we end up in NanoTimeUtils.getTimestamp
- in Hive 2.3.8, we don't take into account the writer's time zone
-
HIVE-21290
- See commit
When using the hdfs:parquet
profile, we read the 12-bytes that make up the INT96 timestamp and convert that to a timestamp with ParquetTypeConverter.bytesToTimestamp
were we shift according to PXF's local time zone.
See the following Parquet technical paper Consistent timestamp types in Hadoop SQL engines