flink-scala-api icon indicating copy to clipboard operation
flink-scala-api copied to clipboard

Concurrency issue in subclasses of TypeInformation

Open sonowz opened this issue 1 year ago • 0 comments

Hi, I stumbled upon the race conditions in a app using this library.

Description

As far as I know, TypeInformation class is okay to be used as singleton, whereas TypeSerializer isn't. JavaDoc of TypeSerializer class reads:

The methods in this class are not necessarily thread safe. To avoid unpredictable side effects, it is recommended to call duplicate() method and use one serializer instance per thread.

However, the TypeInformation classes in the library just pass TypeSerializer instance, resulting the instance to be used in multiple threads: https://github.com/flink-extended/flink-scala-api/blob/892bd718b4fb0f9c43d815095ce47ddab965b196/src/main/scala/org/apache/flinkx/api/typeinfo/ProductTypeInformation.scala#L18

Therefore, this can lead to data inconsistency when used with thread-unsafe TypeSerializer such as CaseClassSerializer (It has mutable variable used during deserialization.)

Steps to reproduce

This example code shows that the data inconsistency could happen when run in multicore environment:

import org.apache.flink.api.common.typeinfo.TypeInformation
import org.apache.flink.core.memory.{DataInputViewStreamWrapper, DataOutputViewStreamWrapper}
import org.apache.flinkx.api.serializers.*
import java.io.{ByteArrayInputStream, ByteArrayOutputStream}
import scala.collection.parallel.CollectionConverters.given
import scala.collection.parallel.ParSeq


object ExampleApp:

  case class TestClass(a: String, b: String)

  val typeInfo: TypeInformation[TestClass] = deriveTypeInformation

  def randomString: String = scala.util.Random.alphanumeric.take(100).mkString

  @main
  def main(): Unit =

    // Make field 'a' and 'b' always have the same value
    val values: ParSeq[TestClass] = (1 to 1000).map {_ =>
      val s = randomString
      TestClass(s, s)
    }.par

    values.foreach { value =>
      val serializer = typeInfo.createSerializer(null)

      val out = new ByteArrayOutputStream()
      val outView = new DataOutputViewStreamWrapper(out)
      serializer.serialize(value, outView)
      val bytes = out.toByteArray

      val in = new ByteArrayInputStream(bytes)
      val inView = new DataInputViewStreamWrapper(in)
      val value2 = serializer.deserialize(inView)

      // The assertion fails randomly
      assert(value2.a == value2.b)
    }

Suggested fix

Looking at the createSerializer() method implementation of POJOTypeInfo in flink-core, it creates a new instance of TypeSerializer.

In the above example, if the TypeSerializer instantiation is modified like this:

val serializer = typeInfo.createSerializer(null).duplicate()

the data inconsistency does not happen anymore.

I'll submit a pull request to fix the issue. Feel free to ask if you have question about the issue or the fix.

sonowz avatar May 01 '24 02:05 sonowz