flink-on-k8s-operator icon indicating copy to clipboard operation
flink-on-k8s-operator copied to clipboard

Entrypoint for submitting a job

Open Yufan-l opened this issue 4 years ago • 6 comments

Question: The operator is using flink run as the entrypoint, while the flink's official docker is using java org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint, what's the difference between them?

I'm asking because I have an app has trouble running with flink run, but working fine with StandaloneJobClusterEntryPoint. Can I override the entrypoint?

Yufan-l avatar Jun 16 '20 11:06 Yufan-l

The design choice of flink run was made because it is easy to integrate with the operator. I don't find java org.apache.flink.container.entrypoint.StandaloneJobClusterEntryPoint provides additional benefits. I'd like to know why flink run doesn't work for your use case.

BTW, the operator doesn't support overriding the entrypoint, but we can consider adding the feature if it is really needed.

functicons avatar Jun 17 '20 03:06 functicons

Thanks for reply. The app is simply consuming a kafka topic with value serialized in avro format. The error is an NPE

org.apache.flink.client.program.ProgramInvocationException: The main method caused an error: null
	at org.apache.flink.client.program.PackagedProgram.callMainMethod(PackagedProgram.java:335)
	at org.apache.flink.client.program.PackagedProgram.invokeInteractiveModeForExecution(PackagedProgram.java:205)
	at org.apache.flink.client.ClientUtils.executeProgram(ClientUtils.java:138)
	at org.apache.flink.client.cli.CliFrontend.executeProgram(CliFrontend.java:662)
	at org.apache.flink.client.cli.CliFrontend.run(CliFrontend.java:210)
	at org.apache.flink.client.cli.CliFrontend.parseParameters(CliFrontend.java:893)
	at org.apache.flink.client.cli.CliFrontend.lambda$main$10(CliFrontend.java:966)
	at java.security.AccessController.doPrivileged(Native Method)
	at javax.security.auth.Subject.doAs(Subject.java:422)
	at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1836)
	at org.apache.flink.runtime.security.HadoopSecurityContext.runSecured(HadoopSecurityContext.java:41)
	at org.apache.flink.client.cli.CliFrontend.main(CliFrontend.java:966)
Caused by: java.lang.NullPointerException
	at org.apache.flink.util.Preconditions.checkNotNull(Preconditions.java:58)
	at org.apache.flink.formats.avro.typeutils.GenericRecordAvroTypeInfo.<init>(GenericRecordAvroTypeInfo.java:45)
	at org.apache.flink.formats.avro.AvroDeserializationSchema.getProducedType(AvroDeserializationSchema.java:169)
	at org.apache.flink.streaming.connectors.kafka.internals.KafkaDeserializationSchemaWrapper.getProducedType(KafkaDeserializationSchemaWrapper.java:55)
	at org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumerBase.getProducedType(FlinkKafkaConsumerBase.java:1042)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1570)
	at org.apache.flink.streaming.api.environment.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.java:1547)
	at org.apache.flink.streaming.api.scala.StreamExecutionEnvironment.addSource(StreamExecutionEnvironment.scala:638)

While I have added all dependencies in the classpath, not sure why flink run produces this error.

Yufan-l avatar Jun 17 '20 11:06 Yufan-l

Did you try creating a Uber jar with dependencies for your job?

functicons avatar Jun 17 '20 15:06 functicons

Yes, i'm using the Uber jar and the docker image is flink:1.10.1-scala_2.11

Yufan-l avatar Jun 18 '20 09:06 Yufan-l

It is nothing about flink run. It seems something wrong with your jar.

Mrart avatar Jul 20 '20 07:07 Mrart

Hi is there any solution for this problem ? currently i'm facing this problem when migrate to production system with flink on yarn. Previously the jobs running well on development server with standalone mode.

sroctadian avatar Apr 27 '21 08:04 sroctadian