parquet-java icon indicating copy to clipboard operation
parquet-java copied to clipboard

`integer overflow` in CapacityByteArrayOutputStream

Open manuzhang opened this issue 4 months ago • 5 comments

Describe the bug, including details regarding any error messages, version, and platform.

The following exception was thrown when we read a column of ARRAY<STRING> in Spark 3.5.0 and Parquet 1.15.2

Caused by: java.lang.ArithmeticException: integer overflow
	at java.base/java.lang.Math.addExact(Math.java:883)
	at org.apache.parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:198)
	at org.apache.parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:220)
	at org.apache.parquet.bytes.LittleEndianDataOutputStream.write(LittleEndianDataOutputStream.java:76)
	at java.base/java.io.OutputStream.write(OutputStream.java:127)
	at org.apache.parquet.io.api.Binary$ByteArrayBackedBinary.writeTo(Binary.java:319)
	at org.apache.parquet.column.values.plain.PlainValuesWriter.writeBytes(PlainValuesWriter.java:55)
	at org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeBytes(FallbackValuesWriter.java:178)
	at org.apache.parquet.column.impl.ColumnWriterBase.write(ColumnWriterBase.java:196)
	at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:473)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9(ParquetWriteSupport.scala:212)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9$adapted(ParquetWriteSupport.scala:210)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$5(ParquetWriteSupport.scala:354)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:490)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$4(ParquetWriteSupport.scala:354)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeGroup(ParquetWriteSupport.scala:484)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$3(ParquetWriteSupport.scala:352)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:490)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$2(ParquetWriteSupport.scala:347)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeGroup(ParquetWriteSupport.scala:484)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$1(ParquetWriteSupport.scala:346)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$1$adapted(ParquetWriteSupport.scala:342)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$writeFields$1(ParquetWriteSupport.scala:168)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:490)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.writeFields(ParquetWriteSupport.scala:168)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$write$1(ParquetWriteSupport.scala:158)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:478)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:158)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:54)
	at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:152)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:240)
	at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:41)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:39)

The issue can be worked around by increasing spark.sql.shuffle.partitions to divide data into smaller partitions. Can it be solved at parquet side?

Component(s)

Core

manuzhang avatar Aug 05 '25 02:08 manuzhang

cc @steveloughran @wgtmac

wangyum avatar Aug 05 '25 03:08 wangyum

It might be that there are too many binary values in the array column. Perhaps you can tune the page size check to be more aggressively. See https://github.com/apache/parquet-java/blob/master/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java and look for following properties:

  • DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK
  • DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK
  • DEFAULT_PAGE_VALUE_COUNT_THRESHOLD
  • DEFAULT_PAGE_ROW_COUNT_LIMIT
  • DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT

wgtmac avatar Aug 05 '25 05:08 wgtmac

It'd be good to know what the state of the column was. Maybe too many values for compression to work with -but does it have to fail this way?

steveloughran avatar Oct 13 '25 16:10 steveloughran

We recently hit this as well (with a slightly different stack trace). I think it's simply a bug in how the CapacityByteArrayOutputStream handles trying to catch this error.

The check for overflow is based on bytesUsed https://github.com/apache/parquet-java/blob/master/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java#L171-L177:

  private void addSlab(int minimumSize) {
    int nextSlabSize;

    // check for overflow
    try {
      Math.addExact(bytesUsed, minimumSize);
    } catch (ArithmeticException e) {
      // This is interpreted as a request for a value greater than Integer.MAX_VALUE
      // We throw OOM because that is what java.io.ByteArrayOutputStream also does
      throw new OutOfMemoryError("Size of data exceeded Integer.MAX_VALUE (" + e.getMessage() + ")");
    }

But this error is happening at the end when bytesAllocated is updated https://github.com/apache/parquet-java/blob/master/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java#L198C5-L198C76:

  this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize);

And if you look at the write method https://github.com/apache/parquet-java/blob/master/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java#L211-L225

  public void write(byte b[], int off, int len) {
    if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) {
      throw new IndexOutOfBoundsException(String.format(
          "Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off));
    }
    if (len > currentSlab.remaining()) {
      final int length1 = currentSlab.remaining();
      currentSlab.put(b, off, length1);
      final int length2 = len - length1;
      addSlab(length2);
      currentSlab.put(b, off + length1, length2);
    } else {
      currentSlab.put(b, off, len);
    }
    bytesUsed = Math.addExact(bytesUsed, len);
  }

The current slab is filled, a new slab is added, and then the rest of the data goes in the new slab, and bytesUsed isn't updated until the end. So when a new slab is added bytesUsed isn't actually properly up to date so the OOM isn't caught in certain edge cases.

I think you could also hit a similar issue if nextSlabSize ends up being larger than minimumSize since that's all that is checked for.

Now why a page is getting this big in the first place I assume is just a weird data problem? We're stilling trying to figure that part out for our issue.

Kimahriman avatar Nov 26 '25 14:11 Kimahriman

The properties that @wgtmac mentions should help. See https://github.com/apache/parquet-java/issues/2697 https://github.com/apache/parquet-java/pull/1032

parthchandra avatar Nov 28 '25 00:11 parthchandra