Handle recursive types
Currently, our generic CaseClassTypeInfo and CaseClassSerializer don't handle recursive case classes. This issue is created from PR discussions in https://github.com/flink-extended/flink-scala-api/pull/280.
I was able to hack something in LowPrioImplicits thanks to the cache of TI: the idea is to store an incomplete temporary TI without field serializers initialized to break the recursivity, and do the field serializers initialization only once, after: it finds temporary TI in the cache so it doesn't try to initialize it another time, effectively breaking the recursivity.
If we want to handle recursivity in CaseClassSerializer we have to do something similar with a global cache for all recursively processed fields (at least getLength, isImmutableType, isImmutableSerializer, createInstance, equals, hashCode and maybe the most tricky one snapshotConfiguration) and find a coherent short-circuit for each (what is the length of a recursive serializer? In theory, its length is infinite, in practice it can be -1).
Similar problems occur also on CaseClassTypeInfo and its parent TupleTypeInfoBase: getFlatFields, getTypeAt, getTotalFields, equals, hashCode and toString.
Maybe memoization or dynamic programming can help to solve these problems: https://stackoverflow.com/a/6185005/6176274
With Scala 3, there is a compilation error when trying to derive a recursive type:
flink-scala-api/modules/flink-common-api/src/test/scala/org/apache/flinkx/api/SerializerTest.scala:91:87
No given instance of type org.apache.flink.api.common.typeinfo.TypeInformation[
Option[org.apache.flinkx.api².SerializerTest.Node]] was found.
I found:
org.apache.flinkx.api².serializers.deriveTypeInformation[
Option[org.apache.flinkx.api².SerializerTest.Node]](
{
final class $anon() extends Object(), Serializable {
type MirroredMonoType = Option[org.apache.flinkx.api².SerializerTest.Node]
}
(new $anon():Object & Serializable)
}.$asInstanceOf[
scala.deriving.Mirror.Sum{
type MirroredMonoType² =
Option[org.apache.flinkx.api².SerializerTest.Node];
type MirroredType = Option[org.apache.flinkx.api².SerializerTest.Node]
;
type MirroredLabel = ("Option" : String);
type MirroredElemTypes = (None.type,
Some[org.apache.flinkx.api².SerializerTest.Node]);
type MirroredElemLabels = (("None$" : String), ("Some" : String))
}
],
scala.reflect.ClassTag.apply[
Option[org.apache.flinkx.api².SerializerTest.Node]](classOf[Option]),
{
final class $anon²() extends Object(), org.apache.flinkx.api².TypeTag[
Option[org.apache.flinkx.api².SerializerTest.Node]] {
override lazy val isModule: Boolean = false
override lazy val isCachable: Boolean = true
override lazy val toString: String =
"scala.Option[org.apache.flinkx.api.SerializerTest.Node]"
}
new $anon²():
org.apache.flinkx.api².TypeTag[
Option[org.apache.flinkx.api².SerializerTest.Node]]
}:
org.apache.flinkx.api².TypeTag[
Option[org.apache.flinkx.api².SerializerTest.Node]]
)
But method deriveTypeInformation in trait LowPrioImplicits does not match type org.apache.flink.api.common.typeinfo.TypeInformation[
Option[org.apache.flinkx.api².SerializerTest.Node]]
where: $anon is a anonymous class in the initializer of value typeclass
$anon² is a anonymous class in the initializer of value typeclass
MirroredMonoType is a type in the empty package which is an alias of Option[org.apache.flinkx.api².SerializerTest.Node]
MirroredMonoType² is a type in trait Mirror
api is a package in package org.apache.flink
api² is a package in package org.apache.flinkx
.
val exception = intercept[FlinkRuntimeException](implicitly[TypeInformation[Node]])
It may be a question to ask at Scala Users Forum https://users.scala-lang.org/, where Scala Center folks could help to understand Scala 3 Mirror API.