[BUG][Standalone] Issues while reading decimals
Bug
Which Delta project/connector is this regarding?
- [ ] Spark
- [x] Standalone
- [ ] Flink
- [ ] Kernel
- [ ] Other (fill in here)
Describe the problem
Facing two issues while reading decimal values using delta-standalone library
- Precision and scale is 0 for the BigDecimal returned by RowRecord.getBigDecimal if the decimal column has precision <=18.
- Decimals having more than 24 digits are throwing shadedelta.org.apache.parquet.io.ParquetDecodingException: Can not read value at 2 in block 0 in file
Steps to reproduce
I have attached the complete code that can be used to reproduce this behaviour in the Further Details section. Below is a short snippet taken from it.
org.apache.spark.sql.types.StructType schema = new org.apache.spark.sql.types.StructType(new StructField[] {
new StructField("decimal_18_3_Key", DataTypes.createDecimalType(18, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_18_3_Value", DataTypes.createDecimalType(18, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_28_3_Key", DataTypes.createDecimalType(28, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_28_3_Value", DataTypes.createDecimalType(28, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_38_3_Key", DataTypes.createDecimalType(38, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_38_3_Value", DataTypes.createDecimalType(38, 3), true, org.apache.spark.sql.types.Metadata.empty()),
});
Row row1 = RowFactory.create(
new BigDecimal("12345678.009").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("12345678.009").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("-123456780000.123").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("123445680000.123").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("12345678901000.123").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("12345678901000.123").setScale(3, RoundingMode.HALF_UP)
);
Row row2 = RowFactory.create(
new BigDecimal("12345678901234.568").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("-12345678901234.568").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("12345678901200123.456").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("-12345678901200123.456").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("321245678901200000001.456").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("32124567890120000000.456").setScale(3, RoundingMode.HALF_UP)
);
Observed results
Issue1:
row.getBigDecimal("decimal_18_3_Key") returns
whereas row.getBigDecimal("decimal_28_3_Key") returns
Issue2: Since row2's "decimal_38_3_Key" has decimal of 25 digits (321245678901200000001.456) its failing with shadedelta.org.apache.parquet.io.ParquetDecodingException: Can not read value at 2 in block 0 in file. Till 24 digits it worked. As soon as I put 25 digits read started failing
Expected results
Issue1: This could be due to the reason precision <=18 is stored as INT64 and >18 is stored as fixed_len_byte_arrya. However BigDecimal that is returned should be consistent irrespective of how it's stored in the underlying parquet.
Issue2: Read should work seamlessly irrespective of the number of digits in the decimal
Further details
Below is the complete code that creates parquet, creates delta log and reads it back
import io.delta.standalone.DeltaLog;
import io.delta.standalone.Operation;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.Snapshot;
import io.delta.standalone.VersionLog;
import io.delta.standalone.actions.Action;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.Metadata;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.data.RowRecord;
import io.delta.standalone.types.StructType;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;
import java.io.File;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
public class DeltaStandaloneTest {
public static void main(String[] args) throws Exception {
deltaEndToEnd_spark("C:\\Temp\\deltaLake\\decimalValues", "decimalValues.parquet");
}
private static void deltaEndToEnd_spark(String deltaDir, String fileName) throws Exception {
Files.createDirectories(Paths.get(deltaDir));
String filePath = deltaDir + "\\" + fileName;
String schema = createParquet(filePath);
DeltaLog log = DeltaLog.forTable(new Configuration(), deltaDir);
OptimisticTransaction txn = log.startTransaction();
Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>()).schema((StructType) StructType.fromJson(schema)).build();
List<Action> actions = Arrays.asList(new AddFile(filePath, new HashMap<>(), new File(filePath).length(),
System.currentTimeMillis(), true, null, null));
txn.updateMetadata(metaData);
txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "delta_standalone_tester");
testRead(deltaDir);
}
public static String createParquet(String path) {
org.apache.spark.sql.types.StructType schema = new org.apache.spark.sql.types.StructType(new StructField[] {
new StructField("decimal_18_3_Key", DataTypes.createDecimalType(18, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_18_3_Value", DataTypes.createDecimalType(18, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_28_3_Key", DataTypes.createDecimalType(28, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_28_3_Value", DataTypes.createDecimalType(28, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_38_3_Key", DataTypes.createDecimalType(38, 3), true, org.apache.spark.sql.types.Metadata.empty()),
new StructField("decimal_38_3_Value", DataTypes.createDecimalType(38, 3), true, org.apache.spark.sql.types.Metadata.empty()),
});
Row row1 = RowFactory.create(
new BigDecimal("12345678.009").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("12345678.009").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("-123456780000.123").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("123445680000.123").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("12345678901000.123").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("12345678901000.123").setScale(3, RoundingMode.HALF_UP)
);
Row row2 = RowFactory.create(
new BigDecimal("12345678901234.568").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("-12345678901234.568").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("12345678901200123.456").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("-12345678901200123.456").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("321245678901200000001.456").setScale(3, RoundingMode.HALF_UP),
new BigDecimal("32124567890120000000.456").setScale(3, RoundingMode.HALF_UP)
);
SparkSession spark = SparkSession.builder()
.appName("WriteParquetExample")
.master("local[*]")
.getOrCreate();
List<Row> rows = Arrays.asList(row1, row2);
// Create DataFrame
Dataset<Row> df = spark.createDataFrame(rows, schema);
// Repartition DataFrame to a single partition
df = df.repartition(1);
// Write DataFrame to Parquet file
df.write().mode(SaveMode.Overwrite).parquet(path);
// Stop SparkSession
spark.stop();
System.out.println("Parquet file written to: " + path);
return schema.json();
}
public static void testRead(String path) {
Configuration configuration = new Configuration();
DeltaLog log = DeltaLog.forTable(configuration, path);
Iterator<VersionLog> it = log.getChanges(0, false);
while (it.hasNext()) {
VersionLog ver = it.next();
System.out.println("Version "+ver.getVersion());
}
printSnapshotDetails("current snapshot", log.snapshot());
}
public static void printSnapshotDetails(String title, Snapshot snapshot) {
System.out.println("===== " + title + " =====");
System.out.println("version: " + snapshot.getVersion());
System.out.println("schema: "+snapshot.getMetadata().getSchema().toJson());
System.out.println("number data files: " + snapshot.getAllFiles().size());
System.out.println("data files:");
snapshot.getAllFiles().forEach(file -> System.out.println(file.getPath()));
CloseableIterator<RowRecord> iter = snapshot.open();
System.out.println("\ndata rows:");
RowRecord row = null;
int numRows = 0;
while (iter.hasNext()) {
row = iter.next();
numRows++;
BigDecimal c1 = row.isNullAt("decimal_18_3_Key") ? null : row.getBigDecimal("decimal_18_3_Key").setScale(3, RoundingMode.HALF_UP);
BigDecimal c2 = row.isNullAt("decimal_18_3_Value") ? null : row.getBigDecimal("decimal_18_3_Value").setScale(3, RoundingMode.HALF_UP);
BigDecimal c3 = row.isNullAt("decimal_28_3_Key") ? null : row.getBigDecimal("decimal_28_3_Key").setScale(3, RoundingMode.HALF_UP);
BigDecimal c4 = row.isNullAt("decimal_28_3_Value") ? null : row.getBigDecimal("decimal_28_3_Value").setScale(3, RoundingMode.HALF_UP);
BigDecimal c5 = row.isNullAt("decimal_38_3_Key") ? null : row.getBigDecimal("decimal_38_3_Key").setScale(3, RoundingMode.HALF_UP);
BigDecimal c6 = row.isNullAt("decimal_38_3_Value") ? null : row.getBigDecimal("decimal_38_3_Value").setScale(3, RoundingMode.HALF_UP);
System.out.println(c1 + " " + c2 + " " + c3 + " " +c4 + " " + c5 + " " + c6);
}
System.out.println("\nnumber rows: " + numRows);
System.out.println("data schema:");
System.out.println(row.getSchema().getTreeString());
System.out.println("\n");
}
}
Environment information
- Delta Lake version: delta-standalone_2.12 3.2.0
- Spark version: 3.5.1
- Scala version:
Willingness to contribute
The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?
- [ ] Yes. I can contribute a fix for this bug independently.
- [x] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
- [ ] No. I cannot contribute a bug fix at this time.
@Sandy3094 Standalone uses the parquet4s library to read data from Parquet data files. It may have some limitations. We have been working on Delta Kernel to replace the standalone and support higher Delta protocol versions with simpler APIs.
I just tried your above test in Kernel and I see the results returned by Spark reader and Delta Kernel reader are the same. You can checkout the test details here. The issue 2 (parquet decoding exception, should be solved using the Kernel). For the issue 1: I checked in the debugger, that the BigDecimal returned by the Kernel has the proper scale and precision as you set in the schema.
Give it a try and let us know if any specific functionality you need from Kernel is missing. The page has links to examples that can help you migrate from Standalone.
Yeah with Delta Kernel the decimal read works fine. However, I noticed an issue while writing decimals using the delta kernel. Could be a corner case.
StructType schema = new StructType()
.add("decimal_18_3_Key", new DecimalType(18,3))
.add("decimal_18_3_Value", new DecimalType(18,3))
.add("decimal_28_3_Key", new DecimalType(28,3))
.add("decimal_28_3_Value", new DecimalType(28,3))
.add("decimal_38_3_Key", new DecimalType(38,3))
.add("decimal_38_3_Value", new DecimalType(38,3));
For the above schema, I got the below exception
Exception in thread "main" java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: Index: 38, Size: 38
at io.delta.kernel.utils.CloseableIterable.inMemoryIterable(CloseableIterable.java:80)
at infa.DeltaKernelTest.commitRow(DeltaKernelTest.java:175)
at infa.DeltaKernelTest.createTableWithSampleData(DeltaKernelTest.java:95)
at infa.DeltaKernelTest.deltaEndToEnd_kernel(DeltaKernelTest.java:66)
at infa.DeltaKernelTest.main(DeltaKernelTest.java:62)
Caused by: java.lang.IndexOutOfBoundsException: Index: 38, Size: 38
at java.util.ArrayList.rangeCheck(ArrayList.java:659)
at java.util.ArrayList.get(ArrayList.java:435)
at java.util.Collections$UnmodifiableList.get(Collections.java:1311)
at io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.toParquetType(ParquetSchemaUtils.java:220)
at io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.toParquetSchema(ParquetSchemaUtils.java:149)
at io.delta.kernel.defaults.internal.parquet.ParquetFileWriter$1.createOrGetWriteSupport(ParquetFileWriter.java:237)
at io.delta.kernel.defaults.internal.parquet.ParquetFileWriter$1.hasNextRow(ParquetFileWriter.java:205)
at io.delta.kernel.defaults.internal.parquet.ParquetFileWriter$1.writeNextFile(ParquetFileWriter.java:152)
at io.delta.kernel.defaults.internal.parquet.ParquetFileWriter$1.hasNext(ParquetFileWriter.java:136)
at io.delta.kernel.utils.CloseableIterator$1.hasNext(CloseableIterator.java:53)
at io.delta.kernel.utils.CloseableIterable.inMemoryIterable(CloseableIterable.java:71)
... 4 more
@Sandy3094 That is indeed a bug on the default Parquet writer side. Made a PR here to fix it. Thanks for reporting.