Cannot read decimal values whose physical types are INT32 and INT64
Describe the bug, including details regarding any error messages, version, and platform.
Issue
I am saving a parquet file with spark where one of the columns is decimal. Physical type of this column becomes INT32 and INT64 based on its precision. Then, when I read the parquet file with AvroParquetReader, I see logical type being long with the wrong value. For example, if original value is 23.4 then read value is 234.
Spark side
If I enable spark.sql.parquet.writeLegacyFormat for the Spark (ex Jira: SPARK-20297), I see that Spark does not use INT32/INT64 as physical type and then I can successfully read the parquet file. However, this is not the default option and also based on the decimal documentation of this repo, INT32/INT64 should be viable options.
How to reproduce
-
Writing with Spark (version: 3.3.0)
df_temp = spark.createDataFrame([
(120.321, "Alex"), (24.45, "John")],
schema=["salary", "name"]
)
df_temp.createOrReplaceTempView("companyTable")
df = spark.sql("SELECT *, CAST(salary as DECIMAL(10,1)) as decimal_salary FROM companyTable")
df.show()
df.write.parquet("my_path")
+-------+----+--------------+
| salary|name|decimal_salary|
+-------+----+--------------+
|120.321|Alex| 120.3|
| 24.45|John| 24.5|
+-------+----+--------------+
-
Confirming the schema
Running the parquet-tools:
parquet-tools inspect github_example.parquet
############ file meta data ############
created_by: parquet-mr version 1.12.2 (build ${buildNumber})
num_columns: 3
num_rows: 1
num_row_groups: 1
format_version: 1.0
serialized_size: 757
############ Columns ############
salary
name
decimal_salary
############ Column(salary) ############
name: salary
path: salary
max_definition_level: 1
max_repetition_level: 0
physical_type: DOUBLE
logical_type: None
converted_type (legacy): NONE
compression: SNAPPY (space_saved: -5%)
############ Column(name) ############
name: name
path: name
max_definition_level: 1
max_repetition_level: 0
physical_type: BYTE_ARRAY
logical_type: String
converted_type (legacy): UTF8
compression: SNAPPY (space_saved: -5%)
############ Column(decimal_salary) ############
name: decimal_salary
path: decimal_salary
max_definition_level: 1
max_repetition_level: 0
physical_type: INT64
logical_type: Decimal(precision=10, scale=1)
converted_type (legacy): DECIMAL
compression: SNAPPY (space_saved: -5%)
-
Reading with AvroParquetReader
public static void main(String[] args) {
String filePath = "my_path";
// Check if file exists
File file = new File(filePath);
if(!file.exists() || file.isDirectory()) {
System.err.println("Invalid file path");
return;
}
GenericData genericData = new GenericData();
genericData.addLogicalTypeConversion(new Conversions.DecimalConversion());
try {
Path path = new Path(filePath);
ParquetReader<GenericRecord> reader = AvroParquetReader
.<GenericRecord>builder(HadoopInputFile.fromPath(path, new Configuration()))
.withDataModel(genericData)
.build();
GenericRecord record;
while ((record = reader.read()) != null) {
// Process the record
System.out.println(record.toString());
System.out.println(record.getSchema());
}
} catch (IOException e) {
e.printStackTrace();
}
}
{"salary": 120.321, "name": "Alex", "decimal_salary": 1203}
{"type":"record","name":"spark_schema","fields":[{"name":"salary","type":["null","double"],"default":null},{"name":"name","type":["null","string"],"default":null},{"name":"decimal_salary","type":["null","long"],"default":null}]}
Dependencies
<dependencies>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-common</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-encoding</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-column</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-hadoop</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.parquet</groupId>
<artifactId>parquet-avro</artifactId>
<version>1.15.0</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.4.1</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-mapreduce-client-core</artifactId>
<version>3.4.1</version>
</dependency>
</dependencies>
Artifacts
Component(s)
Avro
It looks like ParquetAvroReader doesn't handle decimal logical types.
@ConeyLiu IIUC, parquet-cli (which uses ParquetAvroReader) might also hit this issue?
Yes, it should have the same problem. I searched the code in ParquetAvroReader, and there is not any process for decimal.