zeppelin icon indicating copy to clipboard operation
zeppelin copied to clipboard

[ZEPPELIN-6284] Added Flink 1.20 interpreter.

Open Neuw84 opened this issue 4 months ago • 18 comments

What is this PR for?

A few sentences describing the overall goals of the pull request's commits. First time? Check out the contributing guide - https://zeppelin.apache.org/contribution/contributions.html

Add Flink 1.20 support for Apache Flink interpreter. Being the latest 1.X release and LTS one it is important to be able to support it. For example we can use latest Iceberg versions with it.

What type of PR is it?

Feature

Todos

What is the Jira issue?

  • Open an issue on Jira https://issues.apache.org/jira/browse/ZEPPELIN/
  • Put link here, and add [ZEPPELIN-Jira number] in PR title, eg. [ZEPPELIN-533]

https://issues.apache.org/jira/browse/ZEPPELIN-6284

How should this be tested?

  • Strongly recommended: add automated unit tests for any new or changed behavior
  • Outline any manual steps to test the PR here.

Screenshots (if appropriate)

Questions:

  • Does the license files need to update? No
  • Is there breaking changes for older versions? No
  • Does this needs documentation? No

Neuw84 avatar Aug 20 '25 10:08 Neuw84

Based on previous work from 1.18 from these pull requests.

https://github.com/apache/zeppelin/pull/4864 https://github.com/apache/zeppelin/pull/4739

Neuw84 avatar Aug 20 '25 10:08 Neuw84

@Neuw84 Thank you for your contribution. By the way, flink test is failing now. Could you please check and fix them?

jongyoul avatar Aug 21 '25 02:08 jongyoul

@jongyoul I have fixed Copilot comments and added the corresponding test file.

If all is good let me know if I need to clean the "commit log".

Neuw84 avatar Aug 21 '25 10:08 Neuw84

Unmmm, looking at the docs I see this weird error. Seems like it is not being able to launch the cluster, but a little bit lost here.

 WARN [2025-08-21 11:04:10,021] ({flink-metrics-pekko.actor.supervisor-dispatcher-5} SupervisorActor.java[rpcActorFailed]:144) - RpcActor pekko://flink-metrics/user/rpc/MetricQueryService has failed. Shutting it down now.
java.lang.NullPointerException
	at org.slf4j.impl.Reload4jMDCAdapter.setContextMap(Reload4jMDCAdapter.java:81)
	at org.slf4j.MDC.setContextMap(MDC.java:264)
	at org.apache.flink.util.MdcUtils.lambda$withContext$0(MdcUtils.java:48)
	at org.apache.flink.runtime.rpc.pekko.PekkoRpcActor.handleControlMessage(PekkoRpcActor.java:208)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:33)
	at org.apache.pekko.japi.pf.UnitCaseStatement.apply(CaseStatements.scala:29)
	at scala.PartialFunction.applyOrElse(PartialFunction.scala:127)
	at scala.PartialFunction.applyOrElse$(PartialFunction.scala:126)
	at org.apache.pekko.japi.pf.UnitCaseStatement.applyOrElse(CaseStatements.scala:29)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:175)
	at scala.PartialFunction$OrElse.applyOrElse(PartialFunction.scala:176)
	at org.apache.pekko.actor.Actor.aroundReceive(Actor.scala:547)
	at org.apache.pekko.actor.Actor.aroundReceive$(Actor.scala:545)
	at org.apache.pekko.actor.AbstractActor.aroundReceive(AbstractActor.scala:229)
	at org.apache.pekko.actor.ActorCell.receiveMessage(ActorCell.scala:590)
	at org.apache.pekko.actor.ActorCell.invoke(ActorCell.scala:557)
	at org.apache.pekko.dispatch.Mailbox.processMailbox(Mailbox.scala:280)
	at org.apache.pekko.dispatch.Mailbox.run(Mailbox.scala:241)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)
 WARN [2025-08-21 11:04:10,031] ({main} HadoopUtils.java[getHadoopConfiguration]:139) - Could not find Hadoop configuration via any of the supported methods (Flink configuration, environment variables).

Any help is appreciated as the Exception do not provide much info....

Indeed the code that is present here is working at least for Scala and SQL interfaces ( checked with some Notebooks) with Java 11.

Neuw84 avatar Aug 21 '25 13:08 Neuw84

Seems that the error comes from a NPE Exception from Log4j.

Flink 1.20.2 seems to address the issue.

https://flink.apache.org/2025/07/10/apache-flink-1.20.2-release-announcement/?utm_source=chatgpt.com

Neuw84 avatar Aug 21 '25 13:08 Neuw84

Log4j issue solved with upgrade to 1.20.2, now I am getting a fancy scala error with some version mismatch.

error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe

to reproduce them.

mvn -pl flink/flink-scala-2.12 -Pflink-120  clean install -DskipTests -am
mvn -pl flink/flink-scala-2.12 -Pflink-120 test

Tried to fix it but without luck yet.

Using a docker-compose setup to connect Zeppelin to a Flink cluster on 1.20.2 with this code work as expected ( Scala , SQL interfaces).

Neuw84 avatar Aug 21 '25 14:08 Neuw84

@zjffdu Could you please help to fix the error if you could share your time to review it?

jongyoul avatar Aug 21 '25 15:08 jongyoul

After some research I wasn´t able to identify the offending library. Spend some time time trying to fix scala versions without luck via maven.

The code works well as said on my previous comment, should be a thing of scala-reflect/compiler used on the tests but no clue.

Neuw84 avatar Aug 21 '25 18:08 Neuw84

After digging and digging with scala libraries I think that the error is not solvable.... I don't see any scala library collisions and tried forcing many versions.

Seems that they changed how you need to create the environments and that should updated on ScalaInterpreter.

We should have a codepath for 1.17 and below and one for 1.20 now.

Neuw84 avatar Aug 22 '25 12:08 Neuw84

now I am getting a fancy scala error with some version mismatch.

error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe

This is why I abandoned the previous PR ... recompiling Flink with a higher Scala version does not work either ... the key point here is FLINK-32560 added a @deprecated(since = "1.18.0") to trait ImplicitExpressionConversions, the issue will go away if we revert FLINK-32560

pan3793 avatar Aug 25 '25 10:08 pan3793

FLINK-36327 replaces the @deprecated(since = "1.18.0") with Java's @Deprecated, you may want to try Flink 2.0

pan3793 avatar Aug 25 '25 10:08 pan3793

That would be great!

A-Maniovich avatar Aug 27 '25 08:08 A-Maniovich

I dont know if you have noticed or not, but the error isn't actually fatal. If you try to run the cell a second time it surprisingly will work without any problems.

error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe

A-Maniovich avatar Aug 27 '25 08:08 A-Maniovich

I dont know if you have noticed or not, but the error isn't actually fatal. If you try to run the cell a second time it surprisingly will work without any problems.

error reading Scala signature of org.apache.flink.table.api.ImplicitExpressionConversions: unsafe symbol x$2 (child of value <local api>) in runtime reflection universe

I also noticed that, but have no idea why, due to my limited knowledge of scala internal :(

pan3793 avatar Aug 27 '25 09:08 pan3793

I think there are two directions now:

  1. try Flink 2.0 to see if FLINK-36327 helps
  2. eliminate usage of Flink's Scala API since it was deprecated by Flink for a long time.

pan3793 avatar Aug 27 '25 09:08 pan3793

The problem is that Flink 2.0 deletes all scala APIs, that would mean that we need to rewrite the Flink interpreter and see what are the ideal options there. SQL interface ok, but then... python+java repl? It would need a huge rearchitecture....

I will try to spend more time on this (Flink 1.20) but I can't promise anything, any help would be appreciated. As it is know it works but I would like this to be merged to have at least LTS support before migrate to Flink 2.0...

Neuw84 avatar Aug 27 '25 11:08 Neuw84

@Neuw84 If it’s okay with you, I’d like to help too.

hyunw9 avatar Nov 11 '25 23:11 hyunw9

go for it, the code is working well as I having been using heavily it for a re:Invent workshop I am building for next month. It is a matter of solving those scala collisions on testing.

Neuw84 avatar Nov 12 '25 05:11 Neuw84