flink-scala-api icon indicating copy to clipboard operation
flink-scala-api copied to clipboard

Savepoint incompatibility between 1.1.4 and 1.1.5

Open IgnasD opened this issue 1 year ago • 15 comments

After upgrading this library from 1.1.4 to 1.1.5 we've noticed that savepoints made while using older version became incompatible with the job containing newer version.

After digging in, I've traced the cause to the PR https://github.com/flink-extended/flink-scala-api/pull/98. Within this PR the serialized format for the case classes was changed to include arity in the blob. Now the deserializer expects arity as the first 4 bytes, however in the pre-1.1.5 format those first 4 bytes contained some field(s) payload. This results in incorrect deserialization.

Are breaking changes to be expected within patch level releases of this library?

To be honest, I'm not really sure how to get out of this. Since we're dealing with streamed input, naive try { newFormat } catch { oldFormat } won't do the trick, at least at the level the issue was introduced in.

IgnasD avatar Jun 03 '24 13:06 IgnasD

@IgnasD thanks for submitting this issue.

Yes, this was inaccurate change. Sorry. I simply forgot about such issue you reported. This indeed should not be a patch level, but minor or major level release.

Do you have any solution how to deal with this issue now?

I have one idea in mind so far: introduce some feature flag (no arity info), which would deserealize/serealize case classes like before. Alternative idea would be to migrate old savepoints externally, i.e. manually. Create a Flink migration job (using State Processor API) to read savepoints with older library version and then somehow migrate to newer version, perhaps going through some intermediate custom code and custom data format.

novakov-alexey avatar Jun 03 '24 15:06 novakov-alexey

Feature flag sounds reasonable. And to avoid bloat, this flag then could be removed with the next minor/major version.

But generally, this just postpones the problem. Real long-term solution would probably be to use State Processor API, though I personally haven't used that API, so I'm not sure about its capabilities. What I do not like about this approach is that it seems that it would require a lot of manual orchestration.

Though, the feature flag could probably help in this area too. Without feature flag, two State Processor jobs would be needed, one with 1.1.4 library version to get to some intermediary, and another with 1.1.5 that would convert intermediary to the desired state with arity info. With feature flag, those probably could be merged into one.

IgnasD avatar Jun 04 '24 06:06 IgnasD

Yes, pure migration solution would require two one-time jobs, which may lead to a lot of work.

I vote for feature flag solution as well. We now need to implement it to switch Case Class Serialiser logic.

novakov-alexey avatar Jun 04 '24 15:06 novakov-alexey

@IgnasD should I go and implement it or you want to try yourself?

novakov-alexey avatar Jun 08 '24 16:06 novakov-alexey

@novakov-alexey sure, I'm leaving this one for you, if you don't mind.

IgnasD avatar Jun 09 '24 08:06 IgnasD

Released.

novakov-alexey avatar Jun 17 '24 12:06 novakov-alexey

@novakov-alexey Thanks!

IgnasD avatar Jun 17 '24 12:06 IgnasD

Welcome. If it works, then we could close the issue.

novakov-alexey avatar Jun 17 '24 21:06 novakov-alexey

So the feature flag seem to have helped in some cases.

In other cases jobs are still failing after upgrade, though with a different exception this time:

java.lang.NullPointerException
    at scala.collection.mutable.ArrayOps$ofRef$.length$extension(ArrayOps.scala:204)
    at scala.collection.mutable.ArrayOps$ofRef.length(ArrayOps.scala:204)
    at scala.collection.IndexedSeqOptimized.prefixLengthImpl(IndexedSeqOptimized.scala:41)
    at scala.collection.IndexedSeqOptimized.forall(IndexedSeqOptimized.scala:46)
    at scala.collection.IndexedSeqOptimized.forall$(IndexedSeqOptimized.scala:46)
    at scala.collection.mutable.ArrayOps$ofRef.forall(ArrayOps.scala:198)
    at org.apache.flinkx.api.serializer.CaseClassSerializer.isImmutableType(CaseClassSerializer.scala:43)
    at org.apache.flinkx.api.typeinfo.ProductTypeInformation.createSerializer(ProductTypeInformation.scala:19)
    at org.apache.flink.api.java.typeutils.MapTypeInfo.createSerializer(MapTypeInfo.java:113)
    at org.apache.flink.api.common.state.StateDescriptor.initializeSerializerUnlessSet(StateDescriptor.java:320)
    at org.apache.flink.runtime.state.DefaultOperatorStateBackend.getBroadcastState(DefaultOperatorStateBackend.java:155)
    at org.apache.flink.streaming.api.operators.co.CoBroadcastWithKeyedOperator.open(CoBroadcastWithKeyedOperator.java:104)
    at org.apache.flink.streaming.runtime.tasks.RegularOperatorChain.initializeStateAndOpenOperators(RegularOperatorChain.java:107)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreGates(StreamTask.java:753)
    at org.apache.flink.streaming.runtime.tasks.StreamTaskActionExecutor$1.call(StreamTaskActionExecutor.java:55)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restoreInternal(StreamTask.java:728)
    at org.apache.flink.streaming.runtime.tasks.StreamTask.restore(StreamTask.java:693)
    at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:953)
    at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:922)
    at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:746)
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:562)
    at java.base/java.lang.Thread.run(Unknown Source)

Haven't done any digging yet, not sure if it's related to the arity case or something else entirely.

Flink 1.18.1, Scala 2.12

IgnasD avatar Jun 20 '24 11:06 IgnasD

It is probably related to this change: https://github.com/flink-extended/flink-scala-api/pull/114. This is new code which was not there before.

I guess it happens when state is restored with an array of serializers where one or all fields serializers equal to NULL. So that when .isImmutableType is called the NullPointerEx happens. We can fix it by filtering out null values.

novakov-alexey avatar Jun 20 '24 20:06 novakov-alexey

P.S. we need to setup snapshot publishing process for such ad-hoc fixes and their tests.

novakov-alexey avatar Jun 20 '24 20:06 novakov-alexey

@IgnasD I have made a change to fix possible NullPointer in the master branch, but it is not yet published as a release. Feel free to try the master branch if it works or not

novakov-alexey avatar Jun 28 '24 06:06 novakov-alexey

@novakov-alexey I'm just curious what are the scenarios when any of the scalaFieldSerializers could be validly null?

As far as I can tell, fieldSerializers and scalaFieldSerializers should be the same, and there are some method calls to fieldSerializers entries without performing null checks. If this is the case, then the proposed fix would just postpone the problem to a different method call.

Thing is, I cannot reproduce this issue locally yet, and debugging in the environment I've caught it is more or less impossible.

IgnasD avatar Jun 28 '24 09:06 IgnasD

@IgnasD it is really a good question. I have only one guess: when Flink restores some savepoint created by slightly different serializer, so that some variables do not initialize properly. The fix in master branch is more like local defensive-code rather than proper solution to fix the real root cause. So far I do not why scalaFieldSerializers may end up with null values.

novakov-alexey avatar Jun 28 '24 15:06 novakov-alexey

I am going to close this issue, if there is no more updates or other related issues.

novakov-alexey avatar Oct 04 '24 08:10 novakov-alexey