flink-scala-api icon indicating copy to clipboard operation
flink-scala-api copied to clipboard

Cannot submit jobs to Flink

Open ZachFontenot opened this issue 7 months ago • 7 comments

Hey There,

I was just trying this out via the Giter template. I followed the instructions to remove the scala_2.12 jar from the flink/lib directory and set the class-loader pattern to exclude scala.

I've tried both adding the scala 2.13 and scala 3 library jars to the flink/lib folder, as well as manually passing them both + individually to the flink run command.

Either I get an error related to not finding Scala libs at all. org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: Scala products are used but Scala API is not on the classpath.

Or some missing path like scala/collection/immutable or scala/runtime/Nothing

I have tried this with both Flink 2.0.0 and Flink 1.20.1

sbt run works and prints out the wordCount output

ZachFontenot avatar May 16 '25 15:05 ZachFontenot

Alright, new thing I'm going to try is using versions EXACTLY as they were in that other issue about a similar thing and see if I can at least make some progress towards getting this to work. I'm too stupid about Classpath stuff in general to understand what's happening and what needs to happen

ZachFontenot avatar May 16 '25 15:05 ZachFontenot

Same problem.

ZachFontenot avatar May 16 '25 16:05 ZachFontenot

Have you tried to remove scala package from this property Flink configuration property?

classloader.parent-first-patterns.default: java.;org.apache.flink.;com.esotericsoftware.kryo;org.apache.hadoop.;javax.annotation.;org.xml;javax.xml;org.apache.xerces;org.w3c;org.rocksdb.;org.slf4j;org.apache.log4j;org.apache.logging;org.apache.commons.logging;ch.qos.logback```

novakov-alexey avatar May 16 '25 17:05 novakov-alexey

yes, didn't work either. I can run via the Java API with Scala fwiw, so I may just lean into that. I'm confused why the UberJar with the library wouldn't work, likewise with all the other variety of ways of passing the scala libs to the classpath.

ZachFontenot avatar May 16 '25 18:05 ZachFontenot

Could you describe where do you deploy Flink fat JAR? It is on-prem Flink Standalone cluster?

novakov-alexey avatar May 17 '25 16:05 novakov-alexey

Just the local cluster

ZachFontenot avatar May 17 '25 20:05 ZachFontenot

let me try with latest version. I guess by local cluster you mean Flink standalone mode like this https://nightlies.apache.org/flink/flink-docs-master/docs/deployment/resource-providers/standalone/overview/ and not something else.

I guess we need to prepare some docker-compose example to prevent people from being stuck at the very beginning.

novakov-alexey avatar May 18 '25 07:05 novakov-alexey

Just checked on standalone session mode. it works fine on Flink 2.0, should work on on Flink1.x as well. There are steps to test:

  1. Put this library into the cluster lib folder, i.e. download this JAR: https://repo1.maven.org/maven2/org/flinkextended/flink-scala-api-2_3/1.2.8/flink-scala-api-2_3-1.2.8.jar and put inside your flink/lib folder of the JM and TM installations.
  2. Remove scala*.jar from flink/lib installation.
  3. Now start (or restart) Flink standalone cluster to apply changes in the lib folder we made in the steps 1 & 2.
  4. Submit Flink job via flink run command on started cluster.

I am using single machine with the Flink installation to start new cluster and run "flink run" command. Code I used:

package org.example

import org.apache.flinkx.api.*
import org.apache.flinkx.api.serializers.*

@main def wordCountExample =
  val env = StreamExecutionEnvironment.getExecutionEnvironment  
  val text = env.fromElements(
    "To be, or not to be,--that is the question:--",
    "Whether 'tis nobler in the mind to suffer",
    "The slings and arrows of outrageous fortune",
    "Or to take arms against a sea of troubles,"
  )

  text
    .flatMap(_.toLowerCase.split("\\W+"))
    .map((_, 1))
    .keyBy(_._1)
    .sum(1)
    .print()

  env.execute("wordCount")

Command sbt assembly gives me a fat JAR, in my case I am using my GitHub project called flink-sandbox

flink-sandbox/modules/core/target/scala-3.3.5/core-assembly-0.1.0-SNAPSHOT.jar
./bin/flink run  -c org.example.wordCountExample ../flink-sandbox/modules/core/target/scala-3.3.5/core-assembly-0.1.0-SNAPSHOT.jar 
Job has been submitted with JobID ec45d591de66283cce30d08dec5e265c
Program execution finished
Job with JobID ec45d591de66283cce30d08dec5e265c has finished.
Job Runtime: 487 ms
Image

P.S. session mode uses a but weird class loading, even if you pack this scala-api into your fat JAR it is not going to be detected, so this is why we have to add it explicitly into the cluster lib folder. Other user-side libraries is ok to keep in your far JAR.

novakov-alexey avatar Jul 13 '25 17:07 novakov-alexey