`integer overflow` in CapacityByteArrayOutputStream
Describe the bug, including details regarding any error messages, version, and platform.
The following exception was thrown when we read a column of ARRAY<STRING> in Spark 3.5.0 and Parquet 1.15.2
Caused by: java.lang.ArithmeticException: integer overflow
at java.base/java.lang.Math.addExact(Math.java:883)
at org.apache.parquet.bytes.CapacityByteArrayOutputStream.addSlab(CapacityByteArrayOutputStream.java:198)
at org.apache.parquet.bytes.CapacityByteArrayOutputStream.write(CapacityByteArrayOutputStream.java:220)
at org.apache.parquet.bytes.LittleEndianDataOutputStream.write(LittleEndianDataOutputStream.java:76)
at java.base/java.io.OutputStream.write(OutputStream.java:127)
at org.apache.parquet.io.api.Binary$ByteArrayBackedBinary.writeTo(Binary.java:319)
at org.apache.parquet.column.values.plain.PlainValuesWriter.writeBytes(PlainValuesWriter.java:55)
at org.apache.parquet.column.values.fallback.FallbackValuesWriter.writeBytes(FallbackValuesWriter.java:178)
at org.apache.parquet.column.impl.ColumnWriterBase.write(ColumnWriterBase.java:196)
at org.apache.parquet.io.MessageColumnIO$MessageColumnIORecordConsumer.addBinary(MessageColumnIO.java:473)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9(ParquetWriteSupport.scala:212)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeWriter$9$adapted(ParquetWriteSupport.scala:210)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$5(ParquetWriteSupport.scala:354)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:490)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$4(ParquetWriteSupport.scala:354)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeGroup(ParquetWriteSupport.scala:484)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$3(ParquetWriteSupport.scala:352)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:490)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$2(ParquetWriteSupport.scala:347)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeGroup(ParquetWriteSupport.scala:484)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$1(ParquetWriteSupport.scala:346)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$makeArrayWriter$1$adapted(ParquetWriteSupport.scala:342)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$writeFields$1(ParquetWriteSupport.scala:168)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeField(ParquetWriteSupport.scala:490)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.writeFields(ParquetWriteSupport.scala:168)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.$anonfun$write$1(ParquetWriteSupport.scala:158)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.consumeMessage(ParquetWriteSupport.scala:478)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:158)
at org.apache.spark.sql.execution.datasources.parquet.ParquetWriteSupport.write(ParquetWriteSupport.scala:54)
at org.apache.parquet.hadoop.InternalParquetRecordWriter.write(InternalParquetRecordWriter.java:152)
at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:240)
at org.apache.parquet.hadoop.ParquetRecordWriter.write(ParquetRecordWriter.java:41)
at org.apache.spark.sql.execution.datasources.parquet.ParquetOutputWriter.write(ParquetOutputWriter.scala:39)
The issue can be worked around by increasing spark.sql.shuffle.partitions to divide data into smaller partitions.
Can it be solved at parquet side?
Component(s)
Core
cc @steveloughran @wgtmac
It might be that there are too many binary values in the array column. Perhaps you can tune the page size check to be more aggressively. See https://github.com/apache/parquet-java/blob/master/parquet-column/src/main/java/org/apache/parquet/column/ParquetProperties.java and look for following properties:
- DEFAULT_MINIMUM_RECORD_COUNT_FOR_CHECK
- DEFAULT_MAXIMUM_RECORD_COUNT_FOR_CHECK
- DEFAULT_PAGE_VALUE_COUNT_THRESHOLD
- DEFAULT_PAGE_ROW_COUNT_LIMIT
- DEFAULT_ROW_GROUP_ROW_COUNT_LIMIT
It'd be good to know what the state of the column was. Maybe too many values for compression to work with -but does it have to fail this way?
We recently hit this as well (with a slightly different stack trace). I think it's simply a bug in how the CapacityByteArrayOutputStream handles trying to catch this error.
The check for overflow is based on bytesUsed https://github.com/apache/parquet-java/blob/master/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java#L171-L177:
private void addSlab(int minimumSize) {
int nextSlabSize;
// check for overflow
try {
Math.addExact(bytesUsed, minimumSize);
} catch (ArithmeticException e) {
// This is interpreted as a request for a value greater than Integer.MAX_VALUE
// We throw OOM because that is what java.io.ByteArrayOutputStream also does
throw new OutOfMemoryError("Size of data exceeded Integer.MAX_VALUE (" + e.getMessage() + ")");
}
But this error is happening at the end when bytesAllocated is updated https://github.com/apache/parquet-java/blob/master/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java#L198C5-L198C76:
this.bytesAllocated = Math.addExact(this.bytesAllocated, nextSlabSize);
And if you look at the write method https://github.com/apache/parquet-java/blob/master/parquet-common/src/main/java/org/apache/parquet/bytes/CapacityByteArrayOutputStream.java#L211-L225
public void write(byte b[], int off, int len) {
if ((off < 0) || (off > b.length) || (len < 0) || ((off + len) - b.length > 0)) {
throw new IndexOutOfBoundsException(String.format(
"Given byte array of size %d, with requested length(%d) and offset(%d)", b.length, len, off));
}
if (len > currentSlab.remaining()) {
final int length1 = currentSlab.remaining();
currentSlab.put(b, off, length1);
final int length2 = len - length1;
addSlab(length2);
currentSlab.put(b, off + length1, length2);
} else {
currentSlab.put(b, off, len);
}
bytesUsed = Math.addExact(bytesUsed, len);
}
The current slab is filled, a new slab is added, and then the rest of the data goes in the new slab, and bytesUsed isn't updated until the end. So when a new slab is added bytesUsed isn't actually properly up to date so the OOM isn't caught in certain edge cases.
I think you could also hit a similar issue if nextSlabSize ends up being larger than minimumSize since that's all that is checked for.
Now why a page is getting this big in the first place I assume is just a weird data problem? We're stilling trying to figure that part out for our issue.
The properties that @wgtmac mentions should help. See https://github.com/apache/parquet-java/issues/2697 https://github.com/apache/parquet-java/pull/1032