[ZEPPELIN-6284] Added Flink 1.20 interpreter.
What is this PR for?
A few sentences describing the overall goals of the pull request's commits. First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html
Add Flink 1.20 support for Apache Flink interpreter. Being the latest 1.X release and LTS one it is important to be able to support it. For example we can use latest Iceberg versions with it.
What type of PR is it?
Feature
Todos
What is the Jira issue?
- Open an issue on Jira https://issues.apache.org/jira/browse/ZEPPELIN/
- Put link here, and add [ZEPPELIN-Jira number] in PR title, eg. [ZEPPELIN-533]
https://issues.apache.org/jira/browse/ZEPPELIN-6284
How should this be tested?
- Strongly recommended: add automated unit tests for any new or changed behavior
- Outline any manual steps to test the PR here.
Screenshots (if appropriate)
Questions:
- Does the license files need to update? No
- Is there breaking changes for older versions? No
- Does this needs documentation? No
Based on previous work from 1.18 from these pull requests.
https://github.com/apache/zeppelin/pull/4864 https://github.com/apache/zeppelin/pull/4739
@Neuw84 Thank you for your contribution. By the way, flink test is failing now. Could you please check and fix them?
@jongyoul I have fixed Copilot comments and added the corresponding test file.
If all is good let me know if I need to clean the "commit log".
Unmmm, looking at the docs I see this weird error. Seems like it is not being able to launch the cluster, but a little bit lost here.
WARN [2025-08-21 11:04:10,021] ({flink-metrics-pekko.actor.supervisor-dispatcher-5} SupervisorActor.java[rpcActorFailed]:144) - RpcActor pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it down now.
java.lang.NullPointerException
at org.slf4j.impl.Reload4jMDCAdapter.setContextMap(Reload4jMDCAdapter.java:81)
at org.slf4j.MDC.setContextMap(MDC.java:264)
at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48)
at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
at java.base/java.lang.Thread.run(Thread.java:829)
WARN [2025-08-21 11:04:10,031] ({main} HadoopUtils.java[getHadoopConfiguration]:139) - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).
Any help is appreciated as the Exception do not provide much info....
Indeed the code that is present here is working at least for Scala and SQL interfaces ( checked with some Notebooks) with Java 11.
Seems that the error comes from a NPE Exception from Log4j.
Flink 1.20.2 seems to address the issue.
https://flink.apache.org/2025/07/10/apache-flink-1.20.2-release-announcement/?utm_source=chatgpt.com
Log4j issue solved with upgrade to 1.20.2, now I am getting a fancy scala error with some version mismatch.
error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe
to reproduce them.
mvn -pl flink/flink-scala-2.12 -Pflink-120 clean install -DskipTests -am
mvn -pl flink/flink-scala-2.12 -Pflink-120 test
Tried to fix it but without luck yet.
Using a docker-compose setup to connect Zeppelin to a Flink cluster on 1.20.2 with this code work as expected ( Scala , SQL interfaces).
@zjffdu Could you please help to fix the error if you could share your time to review it?
After some research I wasn´t able to identify the offending library. Spend some time time trying to fix scala versions without luck via maven.
The code works well as said on my previous comment, should be a thing of scala-reflect/compiler used on the tests but no clue.
After digging and digging with scala libraries I think that the error is not solvable.... I don't see any scala library collisions and tried forcing many versions.
Seems that they changed how you need to create the environments and that should updated on ScalaInterpreter.
We should have a codepath for 1.17 and below and one for 1.20 now.
now I am getting a fancy scala error with some version mismatch.
error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe
This is why I abandoned the previous PR ... recompiling Flink with a higher Scala version does not work either ... the key point here is FLINK-32560 added a @deprecated(since = "1.18.0") to trait ImplicitExpressionConversions, the issue will go away if we revert FLINK-32560
FLINK-36327 replaces the @deprecated(since = "1.18.0") with Java's @Deprecated, you may want to try Flink 2.0
That would be great!
I dont know if you have noticed or not, but the error isn't actually fatal. If you try to run the cell a second time it surprisingly will work without any problems.
error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe
I dont know if you have noticed or not, but the error isn't actually fatal. If you try to run the cell a second time it surprisingly will work without any problems.
error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe
I also noticed that, but have no idea why, due to my limited knowledge of scala internal :(
I think there are two directions now:
- try Flink 2.0 to see if FLINK-36327 helps
- eliminate usage of Flink's Scala API since it was deprecated by Flink for a long time.
The problem is that Flink 2.0 deletes all scala APIs, that would mean that we need to rewrite the Flink interpreter and see what are the ideal options there. SQL interface ok, but then... python+java repl? It would need a huge rearchitecture....
I will try to spend more time on this (Flink 1.20) but I can't promise anything, any help would be appreciated. As it is know it works but I would like this to be merged to have at least LTS support before migrate to Flink 2.0...
@Neuw84 If it’s okay with you, I’d like to help too.
go for it, the code is working well as I having been using heavily it for a re:Invent workshop I am building for next month. It is a matter of solving those scala collisions on testing.