chunjun icon indicating copy to clipboard operation
chunjun copied to clipboard

自定义flink jobId,如果传了jobId 就用指定的jobId,如果没传就使用flink自动生成的jobId

Open shaolei7788 opened this issue 3 years ago • 1 comments

自定义flink jobId,如果传了jobId 就用指定的jobId,如果没传就使用flink自动生成的jobId

shaolei7788 avatar Apr 27 '22 09:04 shaolei7788

首先在Options类里加入jobId属性,然后在JobGraphUtil类的buildJobGraph方法修改调用 createJobGraph的方式

JobGraph jobGraph = null; if (StringUtils.isNullOrWhitespaceOnly(launcherOptions.getJobId())){ // use the jobId generated by flink when jobId is empty jobGraph = PackagedProgramUtils.createJobGraph( program, launcherOptions.loadFlinkConfiguration(), flinkConf.getInteger(DEFAULT_PARALLELISM), false); }else{ // use the customize jobId generated by flink when jobId is not empty jobGraph = PackagedProgramUtils.createJobGraph( program, launcherOptions.loadFlinkConfiguration(), flinkConf.getInteger(DEFAULT_PARALLELISM), new JobID(StringUtils.hexStringToByte(launcherOptions.getJobId())), false); }

shaolei7788 avatar Apr 27 '22 09:04 shaolei7788