Parquet.jl
Parquet.jl copied to clipboard
Date type columns are being read as integers.
I can't use Parquet.jl because there is a problem reading Date-typed columns. They are reading into Julia DataFrames as an Int32 -- I'm pretty sure parquet files are supposed to define the schema and data types. When I'm working in spark I don't have to pass explicit schema/type definitions.
Here is a simple example from parquet files written by Spark, probably a very common use-case.
Read a small dataframe into spark
// scala>
val df2 = spark.read.parquet("test_date_column_data")
df2.show
+---+----------+-----+---------+---+------+
| id| date_col|group| item| ts|amount|
+---+----------+-----+---------+---+------+
| 1|2020-03-11| A|BOO00OXXX| 1| 1.1|
| 2|2020-03-11| B|BOO00OXXY| 2| 2.1|
| 3|2020-03-12| B|BOO00OXXZ| 3| 17.08|
| 4|2020-03-12| D|BOO00OXXA| 4| 9.3|
| 5|2020-03-13| E|BOO00OXXB| 5| 1.99|
| 6|2020-03-13| A|BOO00OXXY| 1| 0.1|
| 7|2020-03-13| C|BOO00OXXY| 2| 2.0|
+---+----------+-----+---------+---+------+
df2.printSchema
// root
// |-- id: integer (nullable = true)
// |-- date_col: date (nullable = true)
// |-- group: string (nullable = true)
// |-- item: string (nullable = true)
// |-- ts: integer (nullable = true)
// |-- amount: double (nullable = true)
// write to single parquet file
df2.coalesce(1).write.parquet("test2_date_column_data")
Read DataFrame using Parquet.jl
# julia>
using Parquet, DataFrames
dff = DataFrame(read_parquet("test2_date_column_data"))
7×6 DataFrame
Row │ id date_col group item ts amount
│ Int32? Int32? String? String? Int32? Float64?
─────┼────────────────────────────────────────────────────────
1 │ 1 18332 A BOO00OXXX 1 1.1
2 │ 2 18332 B BOO00OXXY 2 2.1
3 │ 3 18333 B BOO00OXXZ 3 17.08
4 │ 4 18333 D BOO00OXXA 4 9.3
5 │ 5 18334 E BOO00OXXB 5 1.99
6 │ 6 18334 A BOO00OXXY 1 0.1
7 │ 7 18334 C BOO00OXXY 2 2.0
eltype.(eachcol(dff))
# 6-element Vector{Union}:
# Union{Missing, Int32}
# Union{Missing, Int32}
# Union{Missing, String}
# Union{Missing, String}
# Union{Missing, Int32}
# Union{Missing, Float64}
The dates in date_col are all messed up.
Addendum
A perhaps related to the root cause on dates problem, but not a priority for me.
When parquet columns are non-nullable type the Int/Float columns are read in as garbage
First create a spark dataframe with date-type column, write to parquet
val dfq = List(
(1,"2020-03-11","A","BOO00OXXX",1, 1.10),
(2,"2020-03-11","B","BOO00OXXY",2, 2.10),
(3,"2020-03-12","B","BOO00OXXZ",3, 17.08),
(4,"2020-03-12","D","BOO00OXXA",4, 9.3),
(5,"2020-03-13","E","BOO00OXXB",5, 1.99),
(6,"2020-03-13","A","BOO00OXXY",1, 0.10),
(7,"2020-03-13","C","BOO00OXXY",2, 2.0)
)
.toDF("id","da_te","group","item","ts","amount")
.select($"id"
, to_date($"da_te","yyyy-MM-dd").as("date_col")
, $"group"
, $"item"
, $"ts"
, $"amount")
dfq.printSchema
// root
// |-- id: integer (nullable = false)
// |-- date_col: date (nullable = true)
// |-- group: string (nullable = true)
// |-- item: string (nullable = true)
// |-- ts: integer (nullable = false)
// |-- amount: double (nullable = false)
dfq.show
+---+----------+-----+---------+---+------+
| id| date_col|group| item| ts|amount|
+---+----------+-----+---------+---+------+
| 1|2020-03-11| A|BOO00OXXX| 1| 1.1|
| 2|2020-03-11| B|BOO00OXXY| 2| 2.1|
| 3|2020-03-12| B|BOO00OXXZ| 3| 17.08|
| 4|2020-03-12| D|BOO00OXXA| 4| 9.3|
| 5|2020-03-13| E|BOO00OXXB| 5| 1.99|
| 6|2020-03-13| A|BOO00OXXY| 1| 0.1|
| 7|2020-03-13| C|BOO00OXXY| 2| 2.0|
+---+----------+-----+---------+---+------+
dfq.coalesce(1).write.parquet("test_date_column_data")
Notice the data is the same as original set, but some columns are nullable = false, this is default behavior for toDF() for Int/Double.
Read parquet data as Julia DataFrame
(v1.6) pkg> generate pq_test_date
(v1.6) pkg> activate .
(pq_test_date) pkg> add DataFrames, Parquet
(pq_test_date) pkg> st
# Project pq_test_date v0.1.0
# Status `~/../pq_test_date/Project.toml`
# [a93c6f00] DataFrames v1.2.0
# [626c502c] Parquet v0.8.3
using Parquet, DataFrames
dfq = DataFrame(read_parquet("test_date_column_data"))
7×6 DataFrame
Row │ id date_col group item ts amount
│ Int32 Int32? String? String? Int32 Float64
─────┼──────────────────────────────────────────────────────────────────
1 │ 0 18332 A BOO00OXXX 0 2.22659e-314
2 │ 13 18332 B BOO00OXXY 150 3.0e-323
3 │ 0 18333 B BOO00OXXZ 0 2.24929e-314
4 │ 263668488 18333 D BOO00OXXA 322394659 2.25631e-314
5 │ 1 18334 E BOO00OXXB 1 2.24929e-314
6 │ 28 18334 A BOO00OXXY 3 2.24916e-314
7 │ 0 18334 C BOO00OXXY 0 2.24916e-314
eltype.(eachcol(dfq))
# 6-element Vector{Type}:
# Int32
# Union{Missing, Int32}
# Union{Missing, String}
# Union{Missing, String}
# Int32
# Float64
# reading it twice gives different numbers
dfq = DataFrame(read_parquet("test_date_column_data"))
7×6 DataFrame
Row │ id date_col group item ts amount
│ Int32 Int32? String? String? Int32 Float64
─────┼──────────────────────────────────────────────────────────────────
1 │ 0 18332 A BOO00OXXX 0 2.25523e-314
2 │ 894 18332 B BOO00OXXY 914 2.27273e-314
3 │ 0 18333 B BOO00OXXZ 0 2.21165e-314
4 │ 267651619 18333 D BOO00OXXA 863662672 2.218e-314
5 │ 1 18334 E BOO00OXXB 1 2.21052e-314
6 │ 4 18334 A BOO00OXXY 877323664 2.21052e-314
7 │ 0 18334 C BOO00OXXY 1 5.0e-324
I think the Int/Float columns are affected by the nullable attribute.
Notes:
- I can work around this problem, data engineer outputs are always nullable cols in my team
- I can read this parquet file back in spark and it is correctly typed, in fact this is how I generated
df2in the first note above.
Any chance you know whether it is writing Parquet's Int96 or Int64 timestamps? Trying to figure out what the status quo is here. Rather to my surprise Int96 seems to work for me, but Int64 does not. Was expecting it to be the other way around.
Oh, part of my problem is that I'm only testing with full timestamps and these are dates, so maybe we just can't read any dates right now :disappointed: .
The date columns are spark DateType. I can attach the sample data to verify, I'm pretty ignorant about this meaning of this as part of the binary encoding present in the physical file.
I can change the date column to timestamp in my production sources, so perhaps this is a workaround.
Attaching parquet output from spark, as a DateType date column, all columns nullable
ls -l test2_date_column_data
total 4
0 Jul 18 22:46 _SUCCESS
staff 1679 Jul 18 22:46 part-00000-dee59adb-aa01-46ea-9215-fa3f296bfd5b-c000.snappy.parquet
It looks like timestamps are working. DateTime doesn't support timezones without TimeZones.jl, so I'm ignoring the timestamp hours conversion from 00:00 to 19:00.
Spark write to parquet with TimestampType column
Test if DateType is not supported vs. TimeStamp
I guess if I had to choose I would support
TimeStampcolumns ahead ofDateType
// scala>
val df_ts = spark.read.parquet("test_date_column_data")
.withColumn("timestamp_col", $"date_col".cast("timestamp"))
scala> df_ts.show
+---+----------+-----+---------+---+------+-------------------+
| id| date_col|group| item| ts|amount| timestamp_col|
+---+----------+-----+---------+---+------+-------------------+
| 1|2020-03-11| A|BOO00OXXX| 1| 1.1|2020-03-11 00:00:00|
| 2|2020-03-11| B|BOO00OXXY| 2| 2.1|2020-03-11 00:00:00|
| 3|2020-03-12| B|BOO00OXXZ| 3| 17.08|2020-03-12 00:00:00|
| 4|2020-03-12| D|BOO00OXXA| 4| 9.3|2020-03-12 00:00:00|
| 5|2020-03-13| E|BOO00OXXB| 5| 1.99|2020-03-13 00:00:00|
| 6|2020-03-13| A|BOO00OXXY| 1| 0.1|2020-03-13 00:00:00|
| 7|2020-03-13| C|BOO00OXXY| 2| 2.0|2020-03-13 00:00:00|
+---+----------+-----+---------+---+------+-------------------+
df_ts.printSchema
// root
// |-- id: integer (nullable = true)
// |-- date_col: date (nullable = true)
// |-- group: string (nullable = true)
// |-- item: string (nullable = true)
// |-- ts: integer (nullable = true)
// |-- amount: double (nullable = true)
// |-- timestamp_col: timestamp (nullable = true)
df_ts.coalesce(1).write.parquet("test_timestamp_column_data")
// shell
$> ls -l test_timestamp_column_data
total 4
-rw-r--r-- 1 user staff 0 Jul 20 15:40 _SUCCESS
-rw-r--r-- 1 user staff 1917 Jul 20 15:40 part-00000-7938c7c8-cead-410f-b001-5c0d9301880c-c000.snappy.parquet
Now read as a julia dataframe via Parquet.jl
# julia>
using Parquet, DataFrames
dfts = DataFrame(read_parquet("test_timestamp_column_data"))
7×7 DataFrame
Row │ id date_col group item ts amount timestamp_col
│ Int32? Int32? String? String? Int32? Float64? DateTime…?
─────┼─────────────────────────────────────────────────────────────────────────────
1 │ 1 18332 A BOO00OXXX 1 1.1 2020-03-11T19:00:00
2 │ 2 18332 B BOO00OXXY 2 2.1 2020-03-11T19:00:00
3 │ 3 18333 B BOO00OXXZ 3 17.08 2020-03-12T19:00:00
4 │ 4 18333 D BOO00OXXA 4 9.3 2020-03-12T19:00:00
5 │ 5 18334 E BOO00OXXB 5 1.99 2020-03-13T19:00:00
6 │ 6 18334 A BOO00OXXY 1 0.1 2020-03-13T19:00:00
7 │ 7 18334 C BOO00OXXY 2 2.0 2020-03-13T19:00:00
eltype.(eachcol(dfts))
# 7-element Vector{Union}:
# Union{Missing, Int32}
# Union{Missing, Int32}
# Union{Missing, String}
# Union{Missing, String}
# Union{Missing, Int32}
# Union{Missing, Float64}
# Union{Missing, Dates.DateTime}
Attaching parquet output from spark write above
$> ls -l test_timestamp_column_data
total 4
-rw-r--r-- 1 user staff 0 Jul 20 15:40 _SUCCESS
-rw-r--r-- 1 user staff 1917 Jul 20 15:40 part-00000-7938c7c8-cead-410f-b001-5c0d9301880c-c000.snappy.parquet
Thanks for the detailed report! Yes Parquet.jl does not support all data types yet. From what I recollect regarding dates and timestamps, it is missing support for (each again with some underlying variants):
- Date (need support from both reader and writer)
- DateTime
- via INT96 (supported by reader, need writer support)
- via INT64 (need support from both reader and writer)
As there are quite a few other issues raised regarding DateTime support for reader (#133) and writer (#102), I was wondering if you have any plans to include DateTime support in Parquet.jl?