flink
flink copied to clipboard
[FLINK-27856][flink-kubernetes]solve the NPE error of no spec field in taskmanager pod template.
What is the purpose of the change
This pull request fix the NPE error of TM pod template file without spec field.
Brief change log
Add validation for TM pod template file spec field.
Verifying this change
Please make sure both new and modified tests in this PR follows the conventions defined in our code quality guide: https://flink.apache.org/contributing/code-style-and-quality-common.html#testing
This change added tests and can be verified as follows:
- Added test that validates TM pod template without spec field:
KubernetesUtilsTest.testLoadPodFromNoSpecTemplate
Does this pull request potentially affect one of the following parts:
- Dependencies (does it add or upgrade a dependency): no
- The public API, i.e., is any changed class annotated with
@Public(Evolving): no - The serializers: no
- The runtime per-record code paths (performance sensitive): no
- Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, Kubernetes/Yarn, ZooKeeper: yes(fixed the JM crash on kubernetes when tm pod template file without spec field.)
Documentation
- Does this pull request introduce a new feature? no
@wangyang0918 Please help me review this PR.
CI report:
- ba3578df461e6bbd54ade0c7e63e48548c04b128 Azure: SUCCESS
Bot commands
The @flinkbot bot supports the following commands:@flinkbot run azurere-run the last Azure build
- @flinkbot run azure
Could we fix this issue in the
KubernetesUtils#loadPodFromTemplateFile? Set an empty spec if not specified.
This PR fixes the issue in KubernetesUtils#loadPodFromTemplateFile. When no spec field is specified, set an empty one.
Code:
public static FlinkPod loadPodFromTemplateFile(
FlinkKubeClient kubeClient, File podTemplateFile, String mainContainerName) {
final KubernetesPod pod = kubeClient.loadPodFromTemplateFile(podTemplateFile);
final List<Container> otherContainers = new ArrayList<>();
Container mainContainer = null;
if (null != pod.getInternalResource().getSpec()) {
for (Container container : pod.getInternalResource().getSpec().getContainers()) {
if (mainContainerName.equals(container.getName())) {
mainContainer = container;
} else {
otherContainers.add(container);
}
}
pod.getInternalResource().getSpec().setContainers(otherContainers);
} else {
// Set empty spec for taskmanager pod template
pod.getInternalResource().setSpec(new PodSpecBuilder().build());
}
if (mainContainer == null) {
LOG.info(
"Could not find main container {} in pod template, using empty one to initialize.",
mainContainerName);
mainContainer = new ContainerBuilder().build();
}
return new FlinkPod(pod.getInternalResource(), mainContainer);
}
My bad. I will take closer look now.
@flinkbot run azure
Would you like to create a backport PR to release-1.15?
Would you like to create a backport PR to release-1.15?
My pleasure! The backport PR has been created to release-1.15: https://github.com/apache/flink/pull/20558