flink-cdc
flink-cdc copied to clipboard
[FLINK-35281][hotfix][cdc-common] FlinkEnvironmentUtils#addJar add each jar only once
Current org.apache.flink.cdc.composer.flink.FlinkEnvironmentUtils#addJar will be invoked for each source and sink.
public static void addJar(StreamExecutionEnvironment env, URL jarUrl) {
try {
Class<StreamExecutionEnvironment> envClass = StreamExecutionEnvironment.class;
Field field = envClass.getDeclaredField("configuration");
field.setAccessible(true);
Configuration configuration = ((Configuration) field.get(env));
List<String> jars =
configuration.getOptional(PipelineOptions.JARS).orElse(new ArrayList<>());
jars.add(jarUrl.toString());
configuration.set(PipelineOptions.JARS, jars);
} catch (Exception e) {
throw new RuntimeException("Failed to add JAR to Flink execution environment", e);
}
if multiple source or sink share same jar, the par path will be added repeatly. For example, mysql to mysql later.
@yuxiqian , @PatrickRen , CC