incubator-streampark
incubator-streampark copied to clipboard
[Bug] Flink Batch Mode Wrong Status On K8s
Search before asking
- [X] I had searched in the issues and found no similar issues.
Java Version
1.8
Scala Version
2.12.x
StreamPark Version
2.1.1
Flink Version
1.16.2
deploy mode
kubernetes-application
What happened
flink BATCH mode job finished and was shut down, the StreamPark FlinkJobStatusWatcher "Failed to visit remote flink jobs on kubernetes-native-mode cluster" and "The deployment is deleted and enters the task failure process."
the batch mode job finished, and the StreamPark job status should be finished instead of failed.
Error Exception
FLINK JOB
2023-08-17 08:19:07,826 INFO org.apache.flink.runtime.taskmanager.Task [] - jobxxxxxxxxxx[3]: Committer (1/1)#0 (ad262c7f219e51660922d308b391fabd_306d8342cb5b2ad8b53f1be57f65bee8_0_0) switched from RUNNING to FINISHED.
2023-08-17 08:19:07,826 INFO org.apache.flink.runtime.taskmanager.Task [] - Freeing task resources for jobxxxxxxxxxx[3]: Committer (1/1)#0 (ad262c7f219e51660922d308b391fabd_306d8342cb5b2ad8b53f1be57f65bee8_0_0).
2023-08-17 08:19:07,826 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Un-registering task and sending final execution state FINISHED to JobManager for task jobxxxxxxxxxx[3]: Committer (1/1)#0 ad262c7f219e51660922d308b391fabd_306d8342cb5b2ad8b53f1be57f65bee8_0_0.
2023-08-17 08:19:07,897 INFO org.apache.flink.runtime.taskexecutor.slot.TaskSlotTableImpl [] - Free slot TaskSlot(index:0, state:ACTIVE, resource profile: ResourceProfile{cpuCores=0.5, taskHeapMemory=25.600mb (26843542 bytes), taskOffHeapMemory=0 bytes, managedMemory=230.400mb (241591914 bytes), networkMemory=64.000mb (67108864 bytes)}, allocationId: 385c45ca65336c338c22d293bbc1654c, jobId: abf7b7864513fcd9042aba91c3e6fc15).
2023-08-17 08:19:07,899 INFO org.apache.flink.runtime.taskexecutor.DefaultJobLeaderService [] - Remove job abf7b7864513fcd9042aba91c3e6fc15 from job leader monitoring.
2023-08-17 08:19:07,899 INFO org.apache.flink.runtime.taskexecutor.TaskExecutor [] - Close JobManager connection for job abf7b7864513fcd9042aba91c3e6fc15.
2023-08-17 08:19:08,033 INFO org.apache.flink.kubernetes.taskmanager.KubernetesTaskExecutorRunner [] - RECEIVED SIGNAL 15: SIGTERM. Shutting down as requested.
2023-08-17 08:19:08,041 INFO org.apache.flink.runtime.blob.TransientBlobCache [] - Shutting down BLOB cache
2023-08-17 08:19:08,041 INFO org.apache.flink.runtime.state.TaskExecutorStateChangelogStoragesManager [] - Shutting down TaskExecutorStateChangelogStoragesManager.
2023-08-17 08:19:08,042 INFO org.apache.flink.runtime.state.TaskExecutorLocalStateStoresManager [] - Shutting down TaskExecutorLocalStateStoresManager.
2023-08-17 08:19:08,043 INFO org.apache.flink.runtime.filecache.FileCache [] - removed file cache directory /tmp/flink-dist-cache-23ab1433-3bbd-4414-a8fe-052419dd4206
2023-08-17 08:19:08,044 INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager removed spill file directory /tmp/flink-netty-shuffle-2b627284-abfd-4dbe-96f6-f64eadf084b8
2023-08-17 08:19:08,045 INFO org.apache.flink.runtime.blob.PermanentBlobCache [] - Shutting down BLOB cache
2023-08-17 08:19:08,045 INFO org.apache.flink.runtime.io.disk.FileChannelManagerImpl [] - FileChannelManager removed spill file directory /tmp/flink-io-c3cd4782-e21e-438f-b32f-f618a9feb579
2023-08-17 08:19:08,494 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2023-08-17 08:19:08,495 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Shutting down remote daemon.
2023-08-17 08:19:08,496 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2023-08-17 08:19:08,497 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remote daemon shut down; proceeding with flushing remote transports.
2023-08-17 08:19:08,602 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
2023-08-17 08:19:08,603 INFO akka.remote.RemoteActorRefProvider$RemotingTerminator [] - Remoting shut down.
StreamPark
16:19:12.423 [pool-8-thread-1] DEBUG org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher - [StreamPark] [FlinkJobStatusWatcher]: Status monitoring process begins - pool-8-thread-1
2023-08-17 16:19:12 | INFO | ForkJoinPool-1-worker-0 | org.apache.hc.client5.http.impl.classic.HttpRequestRetryExec:113] Recoverable I/O exception (org.apache.hc.core5.http.NoHttpResponseException) caught when processing request to {}->http://172.17.97.239:30699
16:19:12.424 [ForkJoinPool-1-worker-0] WARN org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher - Failed to visit remote flink jobs on kubernetes-native-mode cluster, and the retry access logic is performed.
16:19:12.493 [ForkJoinPool-1-worker-0] ERROR org.apache.streampark.flink.kubernetes.KubernetesRetriever - [StreamPark] Get flinkClient error, the error is: java.lang.RuntimeException: org.apache.flink.client.deployment.ClusterRetrieveException: Could not get the rest endpoint of ods-grading-grading-record-trustworthy-test
16:19:12.493 [ForkJoinPool-1-worker-0] INFO org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher - Query the local cache result:false,trackId TrackId(kubernetes-application,flink-cluster,ods-grading-grading-record-trustworthy-test,100071,abf7b7864513fcd9042aba91c3e6fc15,100000).
16:19:12.627 [ForkJoinPool-1-worker-0] INFO org.apache.streampark.flink.kubernetes.watcher.FlinkJobStatusWatcher - The deployment is deleted and enters the task failure process.
Screenshots
No response
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!(您是否要贡献这个PR?)
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
Hi zhaolipan, Thank you for your feedback. Regarding the issue of state tracking in Flink on Kubernetes, we will fix it in a refactored Flink Kubernetes V2 module. Please refer to https://github.com/apache/incubator-streampark/issues/2879
If you want to use flink on k8s, you need to deploy streampark to k8s, otherwise you need to use ingress and merge several PRs from my forked project.