Cannot submit jobs to Flink
Hey There,
I was just trying this out via the Giter template. I followed the instructions to remove the scala_2.12 jar from the flink/lib directory and set the class-loader pattern to exclude scala.
I've tried both adding the scala 2.13 and scala 3 library jars to the flink/lib folder, as well as manually passing them both + individually to the flink run command.
Either I get an error related to not finding Scala libs at all.
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Scala products are used but Scala API is not on the classpath.
Or some missing path like scala/collection/immutable or scala/runtime/Nothing
I have tried this with both Flink 2.0.0 and Flink 1.20.1
sbt run works and prints out the wordCount output
Alright, new thing I'm going to try is using versions EXACTLY as they were in that other issue about a similar thing and see if I can at least make some progress towards getting this to work. I'm too stupid about Classpath stuff in general to understand what's happening and what needs to happen
Same problem.
Have you tried to remove scala package from this property Flink configuration property?
classloader.parent-first-patterns.default: java.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.rocksdb.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback```
yes, didn't work either. I can run via the Java API with Scala fwiw, so I may just lean into that. I'm confused why the UberJar with the library wouldn't work, likewise with all the other variety of ways of passing the scala libs to the classpath.
Could you describe where do you deploy Flink fat JAR? It is on-prem Flink Standalone cluster?
Just the local cluster
let me try with latest version. I guess by local cluster you mean Flink standalone mode like this https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/ and not something else.
I guess we need to prepare some docker-compose example to prevent people from being stuck at the very beginning.
Just checked on standalone session mode. it works fine on Flink 2.0, should work on on Flink1.x as well. There are steps to test:
- Put this library into the cluster lib folder, i.e. download this JAR: https://repo1.maven.org/maven2/org/flinkextended/flink-scala-api-2_3/1.2.8/flink-scala-api-2_3-1.2.8.jar and put inside your
flink/libfolder of the JM and TM installations. - Remove scala*.jar from flink/lib installation.
- Now start (or restart) Flink standalone cluster to apply changes in the lib folder we made in the steps 1 & 2.
- Submit Flink job via
flink runcommand on started cluster.
I am using single machine with the Flink installation to start new cluster and run "flink run" command. Code I used:
package org.example
import org.apache.flinkx.api.*
import org.apache.flinkx.api.serializers.*
@main def wordCountExample =
val env = StreamExecutionEnvironment.getExecutionEnvironment
val text = env.fromElements(
"To be, or not to be,--that is the question:--",
"Whether 'tis nobler in the mind to suffer",
"The slings and arrows of outrageous fortune",
"Or to take arms against a sea of troubles,"
)
text
.flatMap(_.toLowerCase.split("\\W+"))
.map((_, 1))
.keyBy(_._1)
.sum(1)
.print()
env.execute("wordCount")
Command sbt assembly gives me a fat JAR, in my case I am using my GitHub project called flink-sandbox
flink-sandbox/modules/core/target/scala-3.3.5/core-assembly-0.1.0-SNAPSHOT.jar
./bin/flink run -c org.example.wordCountExample ../flink-sandbox/modules/core/target/scala-3.3.5/core-assembly-0.1.0-SNAPSHOT.jar
Job has been submitted with JobID ec45d591de66283cce30d08dec5e265c
Program execution finished
Job with JobID ec45d591de66283cce30d08dec5e265c has finished.
Job Runtime: 487 ms
P.S. session mode uses a but weird class loading, even if you pack this scala-api into your fat JAR it is not going to be detected, so this is why we have to add it explicitly into the cluster lib folder. Other user-side libraries is ok to keep in your far JAR.