Concurrency issue in subclasses of TypeInformation
Hi, I stumbled upon the race conditions in a app using this library.
Description
As far as I know, TypeInformation class is okay to be used as singleton, whereas TypeSerializer isn't. JavaDoc of TypeSerializer class reads:
The methods in this class are not necessarily thread safe. To avoid unpredictable side effects, it is recommended to call
duplicate()method and use one serializer instance per thread.
However, the TypeInformation classes in the library just pass TypeSerializer instance, resulting the instance to be used in multiple threads:
https://github.com/flink-extended/flink-scala-api/blob/892bd718b4fb0f9c43d815095ce47ddab965b196/src/main/scala/org/apache/flinkx/api/typeinfo/ProductTypeInformation.scala#L18
Therefore, this can lead to data inconsistency when used with thread-unsafe TypeSerializer such as CaseClassSerializer (It has mutable variable used during deserialization.)
Steps to reproduce
This example code shows that the data inconsistency could happen when run in multicore environment:
import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.apache.flinkx.api.serializers.*
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.parallel.CollectionConverters.given
import scala.collection.parallel.ParSeq
object ExampleApp:
case class TestClass(a: String, b: String)
val typeInfo: TypeInformation[TestClass] = deriveTypeInformation
def randomString: String = scala.util.Random.alphanumeric.take(100).mkString
@main
def main(): Unit =
// Make field 'a' and 'b' always have the same value
val values: ParSeq[TestClass] = (1 to 1000).map {_ =>
val s = randomString
TestClass(s, s)
}.par
values.foreach { value =>
val serializer = typeInfo.createSerializer(null)
val out = new ByteArrayOutputStream()
val outView = new DataOutputViewStreamWrapper(out)
serializer.serialize(value, outView)
val bytes = out.toByteArray
val in = new ByteArrayInputStream(bytes)
val inView = new DataInputViewStreamWrapper(in)
val value2 = serializer.deserialize(inView)
// The assertion fails randomly
assert(value2.a == value2.b)
}
Suggested fix
Looking at the createSerializer() method implementation of POJOTypeInfo in flink-core, it creates a new instance of TypeSerializer.
In the above example, if the TypeSerializer instantiation is modified like this:
val serializer = typeInfo.createSerializer(null).duplicate()
the data inconsistency does not happen anymore.
I'll submit a pull request to fix the issue. Feel free to ask if you have question about the issue or the fix.