delta icon indicating copy to clipboard operation
delta copied to clipboard

[BUG][Standalone] Issues while reading decimals

Open Sandy3094 opened this issue 1 year ago • 1 comments

Bug

Which Delta project/connector is this regarding?

  • [ ] Spark
  • [x] Standalone
  • [ ] Flink
  • [ ] Kernel
  • [ ] Other (fill in here)

Describe the problem

Facing two issues while reading decimal values using delta-standalone library

  1. Precision and scale is 0 for the BigDecimal returned by RowRecord.getBigDecimal if the decimal column has precision <=18.
  2. Decimals having more than 24 digits are throwing shadedelta.org.apache.parquet.io.ParquetDecodingException: Can not read value at 2 in block 0 in file

Steps to reproduce

I have attached the complete code that can be used to reproduce this behaviour in the Further Details section. Below is a short snippet taken from it.

	org.apache.spark.sql.types.StructType schema = new org.apache.spark.sql.types.StructType(new StructField[] {
            new StructField("decimal_18_3_Key", DataTypes.createDecimalType(18, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_18_3_Value", DataTypes.createDecimalType(18, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_28_3_Key", DataTypes.createDecimalType(28, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_28_3_Value", DataTypes.createDecimalType(28, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_38_3_Key", DataTypes.createDecimalType(38, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_38_3_Value", DataTypes.createDecimalType(38, 3), true, org.apache.spark.sql.types.Metadata.empty()),
    });

    Row row1 = RowFactory.create(
            new BigDecimal("12345678.009").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("12345678.009").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("-123456780000.123").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("123445680000.123").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("12345678901000.123").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("12345678901000.123").setScale(3, RoundingMode.HALF_UP)
    );
    Row row2 = RowFactory.create(
            new BigDecimal("12345678901234.568").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("-12345678901234.568").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("12345678901200123.456").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("-12345678901200123.456").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("321245678901200000001.456").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("32124567890120000000.456").setScale(3, RoundingMode.HALF_UP)
    );

Observed results

Issue1: row.getBigDecimal("decimal_18_3_Key") returns image

whereas row.getBigDecimal("decimal_28_3_Key") returns image

Issue2: Since row2's "decimal_38_3_Key" has decimal of 25 digits (321245678901200000001.456) its failing with shadedelta.org.apache.parquet.io.ParquetDecodingException: Can not read value at 2 in block 0 in file. Till 24 digits it worked. As soon as I put 25 digits read started failing

Expected results

Issue1: This could be due to the reason precision <=18 is stored as INT64 and >18 is stored as fixed_len_byte_arrya. However BigDecimal that is returned should be consistent irrespective of how it's stored in the underlying parquet.

Issue2: Read should work seamlessly irrespective of the number of digits in the decimal

Further details

Below is the complete code that creates parquet, creates delta log and reads it back

import io.delta.standalone.DeltaLog;
import io.delta.standalone.Operation;
import io.delta.standalone.OptimisticTransaction;
import io.delta.standalone.Snapshot;
import io.delta.standalone.VersionLog;
import io.delta.standalone.actions.Action;
import io.delta.standalone.actions.AddFile;
import io.delta.standalone.actions.Metadata;
import io.delta.standalone.data.CloseableIterator;
import io.delta.standalone.data.RowRecord;
import io.delta.standalone.types.StructType;
import org.apache.hadoop.conf.Configuration;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SaveMode;
import org.apache.spark.sql.SparkSession;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructField;

import java.io.File;
import java.math.BigDecimal;
import java.math.RoundingMode;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;

public class DeltaStandaloneTest {
public static void main(String[] args) throws Exception {
    deltaEndToEnd_spark("C:\\Temp\\deltaLake\\decimalValues", "decimalValues.parquet");
}

private static void deltaEndToEnd_spark(String deltaDir, String fileName) throws Exception {
    Files.createDirectories(Paths.get(deltaDir));
    String filePath = deltaDir + "\\" + fileName;
    String schema = createParquet(filePath);

    DeltaLog log = DeltaLog.forTable(new Configuration(), deltaDir);
    OptimisticTransaction txn = log.startTransaction();
    Metadata metaData = txn.metadata().copyBuilder().partitionColumns(new ArrayList<>()).schema((StructType) StructType.fromJson(schema)).build();
    List<Action> actions = Arrays.asList(new AddFile(filePath, new HashMap<>(), new File(filePath).length(),
            System.currentTimeMillis(), true, null, null));
    txn.updateMetadata(metaData);
    txn.commit(actions, new Operation(Operation.Name.CREATE_TABLE), "delta_standalone_tester");

    testRead(deltaDir);
}
public static String createParquet(String path) {
    org.apache.spark.sql.types.StructType schema = new org.apache.spark.sql.types.StructType(new StructField[] {
            new StructField("decimal_18_3_Key", DataTypes.createDecimalType(18, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_18_3_Value", DataTypes.createDecimalType(18, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_28_3_Key", DataTypes.createDecimalType(28, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_28_3_Value", DataTypes.createDecimalType(28, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_38_3_Key", DataTypes.createDecimalType(38, 3), true, org.apache.spark.sql.types.Metadata.empty()),
            new StructField("decimal_38_3_Value", DataTypes.createDecimalType(38, 3), true, org.apache.spark.sql.types.Metadata.empty()),
    });

    Row row1 = RowFactory.create(
            new BigDecimal("12345678.009").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("12345678.009").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("-123456780000.123").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("123445680000.123").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("12345678901000.123").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("12345678901000.123").setScale(3, RoundingMode.HALF_UP)
    );
    Row row2 = RowFactory.create(
            new BigDecimal("12345678901234.568").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("-12345678901234.568").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("12345678901200123.456").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("-12345678901200123.456").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("321245678901200000001.456").setScale(3, RoundingMode.HALF_UP),
            new BigDecimal("32124567890120000000.456").setScale(3, RoundingMode.HALF_UP)
    );

    SparkSession spark = SparkSession.builder()
            .appName("WriteParquetExample")
            .master("local[*]")
            .getOrCreate();

    List<Row> rows = Arrays.asList(row1, row2);

    // Create DataFrame
    Dataset<Row> df = spark.createDataFrame(rows, schema);

    // Repartition DataFrame to a single partition
    df = df.repartition(1);

    // Write DataFrame to Parquet file
    df.write().mode(SaveMode.Overwrite).parquet(path);

    // Stop SparkSession
    spark.stop();

    System.out.println("Parquet file written to: " + path);

    return schema.json();
}
public static void testRead(String path) {
    Configuration configuration = new Configuration();
    DeltaLog log = DeltaLog.forTable(configuration, path);
    Iterator<VersionLog> it = log.getChanges(0, false);
    while (it.hasNext()) {
        VersionLog ver = it.next();
        System.out.println("Version "+ver.getVersion());
    }
    printSnapshotDetails("current snapshot", log.snapshot());
}

public static void printSnapshotDetails(String title, Snapshot snapshot) {
    System.out.println("===== " + title + " =====");
    System.out.println("version: " + snapshot.getVersion());
    System.out.println("schema: "+snapshot.getMetadata().getSchema().toJson());
    System.out.println("number data files: " + snapshot.getAllFiles().size());
    System.out.println("data files:");
    snapshot.getAllFiles().forEach(file -> System.out.println(file.getPath()));

    CloseableIterator<RowRecord> iter = snapshot.open();

    System.out.println("\ndata rows:");
    RowRecord row = null;
    int numRows = 0;
    while (iter.hasNext()) {
        row = iter.next();
        numRows++;
        BigDecimal c1 = row.isNullAt("decimal_18_3_Key") ? null : row.getBigDecimal("decimal_18_3_Key").setScale(3, RoundingMode.HALF_UP);
        BigDecimal c2 = row.isNullAt("decimal_18_3_Value") ? null : row.getBigDecimal("decimal_18_3_Value").setScale(3, RoundingMode.HALF_UP);
        BigDecimal c3 = row.isNullAt("decimal_28_3_Key") ? null : row.getBigDecimal("decimal_28_3_Key").setScale(3, RoundingMode.HALF_UP);
        BigDecimal c4 = row.isNullAt("decimal_28_3_Value") ? null : row.getBigDecimal("decimal_28_3_Value").setScale(3, RoundingMode.HALF_UP);
        BigDecimal c5 = row.isNullAt("decimal_38_3_Key") ? null : row.getBigDecimal("decimal_38_3_Key").setScale(3, RoundingMode.HALF_UP);
        BigDecimal c6 = row.isNullAt("decimal_38_3_Value") ? null : row.getBigDecimal("decimal_38_3_Value").setScale(3, RoundingMode.HALF_UP);
        System.out.println(c1 + " " + c2 + " " + c3 + " " +c4 + " " + c5 + " " + c6);
    }
    System.out.println("\nnumber rows: " + numRows);
    System.out.println("data schema:");
    System.out.println(row.getSchema().getTreeString());
    System.out.println("\n");
}

}

Environment information

  • Delta Lake version: delta-standalone_2.12 3.2.0
  • Spark version: 3.5.1
  • Scala version:

Willingness to contribute

The Delta Lake Community encourages bug fix contributions. Would you or another member of your organization be willing to contribute a fix for this bug to the Delta Lake code base?

  • [ ] Yes. I can contribute a fix for this bug independently.
  • [x] Yes. I would be willing to contribute a fix for this bug with guidance from the Delta Lake community.
  • [ ] No. I cannot contribute a bug fix at this time.

Sandy3094 avatar May 24 '24 06:05 Sandy3094

@Sandy3094 Standalone uses the parquet4s library to read data from Parquet data files. It may have some limitations. We have been working on Delta Kernel to replace the standalone and support higher Delta protocol versions with simpler APIs.

I just tried your above test in Kernel and I see the results returned by Spark reader and Delta Kernel reader are the same. You can checkout the test details here. The issue 2 (parquet decoding exception, should be solved using the Kernel). For the issue 1: I checked in the debugger, that the BigDecimal returned by the Kernel has the proper scale and precision as you set in the schema. Screenshot 2024-05-24 at 9 34 42 AM

Give it a try and let us know if any specific functionality you need from Kernel is missing. The page has links to examples that can help you migrate from Standalone.

vkorukanti avatar May 24 '24 16:05 vkorukanti

Yeah with Delta Kernel the decimal read works fine. However, I noticed an issue while writing decimals using the delta kernel. Could be a corner case.

StructType schema = new StructType()
                .add("decimal_18_3_Key", new DecimalType(18,3))
                .add("decimal_18_3_Value", new DecimalType(18,3))
                .add("decimal_28_3_Key", new DecimalType(28,3))
                .add("decimal_28_3_Value", new DecimalType(28,3))
                .add("decimal_38_3_Key", new DecimalType(38,3))
                .add("decimal_38_3_Value", new DecimalType(38,3));

For the above schema, I got the below exception

Exception in thread "main" java.lang.RuntimeException: java.lang.IndexOutOfBoundsException: Index: 38, Size: 38
	at io.delta.kernel.utils.CloseableIterable.inMemoryIterable(CloseableIterable.java:80)
	at infa.DeltaKernelTest.commitRow(DeltaKernelTest.java:175)
	at infa.DeltaKernelTest.createTableWithSampleData(DeltaKernelTest.java:95)
	at infa.DeltaKernelTest.deltaEndToEnd_kernel(DeltaKernelTest.java:66)
	at infa.DeltaKernelTest.main(DeltaKernelTest.java:62)
Caused by: java.lang.IndexOutOfBoundsException: Index: 38, Size: 38
	at java.util.ArrayList.rangeCheck(ArrayList.java:659)
	at java.util.ArrayList.get(ArrayList.java:435)
	at java.util.Collections$UnmodifiableList.get(Collections.java:1311)
	at io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.toParquetType(ParquetSchemaUtils.java:220)
	at io.delta.kernel.defaults.internal.parquet.ParquetSchemaUtils.toParquetSchema(ParquetSchemaUtils.java:149)
	at io.delta.kernel.defaults.internal.parquet.ParquetFileWriter$1.createOrGetWriteSupport(ParquetFileWriter.java:237)
	at io.delta.kernel.defaults.internal.parquet.ParquetFileWriter$1.hasNextRow(ParquetFileWriter.java:205)
	at io.delta.kernel.defaults.internal.parquet.ParquetFileWriter$1.writeNextFile(ParquetFileWriter.java:152)
	at io.delta.kernel.defaults.internal.parquet.ParquetFileWriter$1.hasNext(ParquetFileWriter.java:136)
	at io.delta.kernel.utils.CloseableIterator$1.hasNext(CloseableIterator.java:53)
	at io.delta.kernel.utils.CloseableIterable.inMemoryIterable(CloseableIterable.java:71)
	... 4 more

Sandy3094 avatar Jun 07 '24 05:06 Sandy3094

@Sandy3094 That is indeed a bug on the default Parquet writer side. Made a PR here to fix it. Thanks for reporting.

vkorukanti avatar Jun 07 '24 17:06 vkorukanti