flink-scala-api icon indicating copy to clipboard operation
flink-scala-api copied to clipboard

Handle recursive types

Open arnaud-daroussin opened this issue 3 months ago • 1 comments

Currently, our generic CaseClassTypeInfo and CaseClassSerializer don't handle recursive case classes. This issue is created from PR discussions in https://github.com/flink-extended/flink-scala-api/pull/280.

I was able to hack something in LowPrioImplicits thanks to the cache of TI: the idea is to store an incomplete temporary TI without field serializers initialized to break the recursivity, and do the field serializers initialization only once, after: it finds temporary TI in the cache so it doesn't try to initialize it another time, effectively breaking the recursivity.

If we want to handle recursivity in CaseClassSerializer we have to do something similar with a global cache for all recursively processed fields (at least getLength, isImmutableType, isImmutableSerializer, createInstance, equals, hashCode and maybe the most tricky one snapshotConfiguration) and find a coherent short-circuit for each (what is the length of a recursive serializer? In theory, its length is infinite, in practice it can be -1).

Similar problems occur also on CaseClassTypeInfo and its parent TupleTypeInfoBase: getFlatFields, getTypeAt, getTotalFields, equals, hashCode and toString.

Maybe memoization or dynamic programming can help to solve these problems: https://stackoverflow.com/a/6185005/6176274

arnaud-daroussin avatar Sep 03 '25 15:09 arnaud-daroussin

With Scala 3, there is a compilation error when trying to derive a recursive type:

flink-scala-api/modules/flink-common-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala:91:87
No given instance of type org.apache.flink.api.common.typeinfo.TypeInformation[
  Option[org.apache.flinkx.api².SerializerTest.Node]] was found.
I found:

    org.apache.flinkx.api².serializers.deriveTypeInformation[
      Option[org.apache.flinkx.api².SerializerTest.Node]](
      {
        final class $anon() extends Object(), Serializable {
          type MirroredMonoType = Option[org.apache.flinkx.api².SerializerTest.Node]
          }
        (new $anon():Object & Serializable)
      }.$asInstanceOf[
        
          scala.deriving.Mirror.Sum{
            type MirroredMonoType² =
              Option[org.apache.flinkx.api².SerializerTest.Node];
              type MirroredType = Option[org.apache.flinkx.api².SerializerTest.Node]
                ;
            type MirroredLabel = ("Option" : String);
              type MirroredElemTypes = (None.type,
                Some[org.apache.flinkx.api².SerializerTest.Node]);
              type MirroredElemLabels = (("None$" : String), ("Some" : String))
          }
        
      ],
      scala.reflect.ClassTag.apply[
        Option[org.apache.flinkx.api².SerializerTest.Node]](classOf[Option]),
      {
        final class $anon²() extends Object(), org.apache.flinkx.api².TypeTag[
          Option[org.apache.flinkx.api².SerializerTest.Node]] {
          override lazy val isModule: Boolean = false
          override lazy val isCachable: Boolean = true
          override lazy val toString: String =
            "scala.Option[org.apache.flinkx.api.SerializerTest.Node]"
        }
        new $anon²():
          
            org.apache.flinkx.api².TypeTag[
              Option[org.apache.flinkx.api².SerializerTest.Node]]
          
      }:
        org.apache.flinkx.api².TypeTag[
          Option[org.apache.flinkx.api².SerializerTest.Node]]
    )

But method deriveTypeInformation in trait LowPrioImplicits does not match type org.apache.flink.api.common.typeinfo.TypeInformation[
  Option[org.apache.flinkx.api².SerializerTest.Node]]

where:    $anon             is a anonymous class in the initializer of value typeclass
          $anon²            is a anonymous class in the initializer of value typeclass
          MirroredMonoType  is a type in the empty package which is an alias of Option[org.apache.flinkx.api².SerializerTest.Node]
          MirroredMonoType² is a type in trait Mirror
          api               is a package in package org.apache.flink
          api²              is a package in package org.apache.flinkx
.
    val exception = intercept[FlinkRuntimeException](implicitly[TypeInformation[Node]])

It may be a question to ask at Scala Users Forum https://users.scala-lang.org/, where Scala Center folks could help to understand Scala 3 Mirror API.

arnaud-daroussin avatar Sep 03 '25 15:09 arnaud-daroussin