flink-on-k8s-operator
flink-on-k8s-operator copied to clipboard
Entrypoint for submitting a job
Question:
The operator is using flink run
as the entrypoint, while the flink's official docker is using java org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
, what's the difference between them?
I'm asking because I have an app has trouble running with flink run
, but working fine with StandaloneJobClusterEntryPoint. Can I override the entrypoint?
The design choice of flink run
was made because it is easy to integrate with the operator. I don't find java org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint
provides additional benefits. I'd like to know why flink run
doesn't work for your use case.
BTW, the operator doesn't support overriding the entrypoint, but we can consider adding the feature if it is really needed.
Thanks for reply. The app is simply consuming a kafka topic with value serialized in avro format. The error is an NPE
org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null
at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.NullPointerException
at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
at org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo.<init>(GenericRecordAvroTypeInfo.java:45)
at org.apache.flink.formats.avro.AvroDeserializationSchema.getProducedType(AvroDeserializationSchema.java:169)
at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.getProducedType(KafkaDeserializationSchemaWrapper.java:55)
at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.getProducedType(FlinkKafkaConsumerBase.java:1042)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1570)
at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1547)
at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:638)
While I have added all dependencies in the classpath, not sure why flink run
produces this error.
Did you try creating a Uber jar with dependencies for your job?
Yes, i'm using the Uber jar and the docker image is flink:1.10.1-scala_2.11
It is nothing about flink run. It seems something wrong with your jar.
Hi is there any solution for this problem ? currently i'm facing this problem when migrate to production system with flink on yarn. Previously the jobs running well on development server with standalone mode.