spark icon indicating copy to clipboard operation
spark copied to clipboard

[SPARK-54489][CORE] Avoid returning corrupted Kryo objects

Open cxzl25 opened this issue 1 month ago • 4 comments

What changes were proposed in this pull request?

When Kryo.writeClassAndObject throws a NegativeArraySizeException, the Kryo instance shall no longer be returned to the pool.

Why are the changes needed?

https://github.com/EsotericSoftware/kryo/pull/1198

When Kryo.writeClassAndObject fails to resize, it throws a NegativeArraySizeException. Spark then returns this Kryo instance to the pool. The next time this object is reused, it will always throw an ArrayIndexOutOfBoundsException.

java.lang.NegativeArraySizeException: -2147483645
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:542)
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:306)
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:300)
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:162)
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.putStash(IdentityObjectIntMap.java:307)
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.push(IdentityObjectIntMap.java:300)
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.put(IdentityObjectIntMap.java:162)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.addWrittenObject(MapReferenceResolver.java:41)
        at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:681)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:361)
        at com.esotericsoftware.kryo.serializers.DefaultArraySerializers$ObjectArraySerializer.write(DefaultArraySerializers.java:302)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:651)
        at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:269)
        at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$4(TorrentBroadcast.scala:358)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1510)
        at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:360)
java.lang.ArrayIndexOutOfBoundsException: Index 688291177 out of bounds for length 3
        at com.esotericsoftware.kryo.util.IdentityObjectIntMap.get(IdentityObjectIntMap.java:322)
        at com.esotericsoftware.kryo.util.MapReferenceResolver.getWrittenId(MapReferenceResolver.java:46)
        at com.esotericsoftware.kryo.Kryo.writeReferenceOrNull(Kryo.java:671)
        at com.esotericsoftware.kryo.Kryo.writeClassAndObject(Kryo.java:646)
        at org.apache.spark.serializer.KryoSerializationStream.writeObject(KryoSerializer.scala:269)
        at org.apache.spark.broadcast.TorrentBroadcast$.$anonfun$blockifyObject$4(TorrentBroadcast.scala:358)
        at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1510)
        at org.apache.spark.broadcast.TorrentBroadcast$.blockifyObject(TorrentBroadcast.scala:360)

Simple test

    IdentityObjectIntMap identityObjectIntMap = new IdentityObjectIntMap(1073741824, 0.8f);
    try {
      identityObjectIntMap.put("k1", 1);
      identityObjectIntMap.clear((1073741824) << 1); // Simulate resize
    } catch (NegativeArraySizeException e) {  
      e.printStackTrace(); // Expected
    }
    identityObjectIntMap.clear(2048);
    identityObjectIntMap.put("k1", 1); // ArrayIndexOutOfBoundsException

Does this PR introduce any user-facing change?

No

How was this patch tested?

Was this patch authored or co-authored using generative AI tooling?

No

cxzl25 avatar Nov 24 '25 12:11 cxzl25

Is it possible to add a test here?

holdenk avatar Nov 24 '25 21:11 holdenk

We do a reset, which should have cleared this map - still an issue ? that is unfortunate

mridulm avatar Dec 06 '25 10:12 mridulm

hmm, I cant seem to reproduce it with the test you have provided @cxzl25 ...

$ ./bin/spark-shell --driver-memory 64g
<snip>
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 4.1.0-SNAPSHOT
      /_/
         
Using Scala version 2.13.16 (OpenJDK 64-Bit Server VM, Java 17.0.17)
<snip>
Spark session available as 'spark'.

scala> import com.esotericsoftware.kryo.util._
import com.esotericsoftware.kryo.util._

scala> val identityObjectIntMap = new IdentityObjectIntMap[String](1073741824, 0.8f)
val identityObjectIntMap: com.esotericsoftware.kryo.util.IdentityObjectIntMap[String] = {}

scala> identityObjectIntMap.put("k1", 1)

scala> identityObjectIntMap.clear((1073741824) << 1)
java.lang.NegativeArraySizeException: -2147483645
  at com.esotericsoftware.kryo.util.IdentityObjectIntMap.resize(IdentityObjectIntMap.java:542)
  at com.esotericsoftware.kryo.util.IdentityObjectIntMap.clear(IdentityObjectIntMap.java:461)
  ... 42 elided

scala> identityObjectIntMap.clear(2048)

scala> identityObjectIntMap.put("k1", 1)

scala> 

mridulm avatar Dec 06 '25 10:12 mridulm

We do a reset, which should have cleared this map - still an issue ?

The clear method will only set the value of the keyTable array to null, but will not reset the keyTable, so the next put will trigger this problem.

com.esotericsoftware.kryo.util.IdentityObjectIntMap#clear(int)

public void clear (int maximumCapacity) {
		if (capacity <= maximumCapacity) {
			clear();
			return;
		}

	public void clear () {
		K[] keyTable = this.keyTable;
		for (int i = capacity + stashSize; i-- > 0;)
			keyTable[i] = null;
		size = 0;
		stashSize = 0;
	}

I cant seem to reproduce it with the test you have provided

Because in IdentityObjectIntMap, it uses System.identityHashCode to calculate hash, so different environments may get different results.

I modified the reproduced code. Maybe this is easier to reproduce.

import com.esotericsoftware.kryo.util._
val identityObjectIntMap = new IdentityObjectIntMap[String](1073741824, 0.8f)
identityObjectIntMap.put("k1", 1)
identityObjectIntMap.clear((1073741824) << 1)
identityObjectIntMap.clear(2048)
for (i <- 0 until 10000) {
  val s = new String("k_" + i)
  val i1 = System.identityHashCode(s) & 2147483647
  if (i1 > 1073741866) {
    println(s"Found one: $s, hash: $i1")
    identityObjectIntMap.put(s, 1)
  }
}

cxzl25 avatar Dec 07 '25 11:12 cxzl25