[SPARK-54489][CORE] Avoid returning corrupted Kryo objects
What changes were proposed in this pull request?
When Kryo.writeClassAndObject throws a NegativeArraySizeException, the Kryo instance shall no longer be returned to the pool.
Why are the changes needed?
https://github.com/EsotericSoftware/kryo/pull/1198
When Kryo.writeClassAndObject fails to resize, it throws a NegativeArraySizeException.
Spark then returns this Kryo instance to the pool. The next time this object is reused, it will always throw an ArrayIndexOutOfBoundsException.
java.lang.NegativeArraySizeException: -2147483645
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:542)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:306)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:300)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:162)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:307)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:300)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:162)
at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:41)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:681)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:269)
at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$4(TorrentBroadcast.scala:358)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1510)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:360)
java.lang.ArrayIndexOutOfBoundsException: Index 688291177 out of bounds for length 3
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.get(IdentityObjectIntMap.java:322)
at com.esotericsoftware.kryo.util.MapReferenceResolver.getWrittenId(MapReferenceResolver.java:46)
at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:671)
at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:269)
at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$4(TorrentBroadcast.scala:358)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1510)
at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:360)
Simple test
IdentityObjectIntMap identityObjectIntMap = new IdentityObjectIntMap(1073741824, 0.8f);
try {
identityObjectIntMap.put("k1", 1);
identityObjectIntMap.clear((1073741824) << 1); // Simulate resize
} catch (NegativeArraySizeException e) {
e.printStackTrace(); // Expected
}
identityObjectIntMap.clear(2048);
identityObjectIntMap.put("k1", 1); // ArrayIndexOutOfBoundsException
Does this PR introduce any user-facing change?
No
How was this patch tested?
Was this patch authored or co-authored using generative AI tooling?
No
Is it possible to add a test here?
We do a reset, which should have cleared this map - still an issue ? that is unfortunate
hmm, I cant seem to reproduce it with the test you have provided @cxzl25 ...
$ ./bin/spark-shell --driver-memory 64g
<snip>
Welcome to
____ __
/ __/__ ___ _____/ /__
_\ \/ _ \/ _ `/ __/ '_/
/___/ .__/\_,_/_/ /_/\_\ version 4.1.0-SNAPSHOT
/_/
Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.17)
<snip>
Spark session available as 'spark'.
scala> import com.esotericsoftware.kryo.util._
import com.esotericsoftware.kryo.util._
scala> val identityObjectIntMap = new IdentityObjectIntMap[String](1073741824, 0.8f)
val identityObjectIntMap: com.esotericsoftware.kryo.util.IdentityObjectIntMap[String] = {}
scala> identityObjectIntMap.put("k1", 1)
scala> identityObjectIntMap.clear((1073741824) << 1)
java.lang.NegativeArraySizeException: -2147483645
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:542)
at com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:461)
... 42 elided
scala> identityObjectIntMap.clear(2048)
scala> identityObjectIntMap.put("k1", 1)
scala>
We do a reset, which should have cleared this map - still an issue ?
The clear method will only set the value of the keyTable array to null, but will not reset the keyTable, so the next put will trigger this problem.
com.esotericsoftware.kryo.util.IdentityObjectIntMap#clear(int)
public void clear (int maximumCapacity) {
if (capacity <= maximumCapacity) {
clear();
return;
}
public void clear () {
K[] keyTable = this.keyTable;
for (int i = capacity + stashSize; i-- > 0;)
keyTable[i] = null;
size = 0;
stashSize = 0;
}
I cant seem to reproduce it with the test you have provided
Because in IdentityObjectIntMap, it uses System.identityHashCode to calculate hash, so different environments may get different results.
I modified the reproduced code. Maybe this is easier to reproduce.
import com.esotericsoftware.kryo.util._
val identityObjectIntMap = new IdentityObjectIntMap[String](1073741824, 0.8f)
identityObjectIntMap.put("k1", 1)
identityObjectIntMap.clear((1073741824) << 1)
identityObjectIntMap.clear(2048)
for (i <- 0 until 10000) {
val s = new String("k_" + i)
val i1 = System.identityHashCode(s) & 2147483647
if (i1 > 1073741866) {
println(s"Found one: $s, hash: $i1")
identityObjectIntMap.put(s, 1)
}
}