incubator-streampark
incubator-streampark copied to clipboard
[Bug] The task status cannot be obtained, and the log shows a RejectedExecutionException exception.
Search before asking
- [X] I had searched in the issues and found no similar issues.
Java Version
JDK 1.8
Scala Version
2.12.x
StreamPark Version
2.1.0
Flink Version
flink 1.14.6
deploy mode
yarn-application
What happened
After the task is submitted, the status is always starting, but the actual task status is running. When I restart streampark, it becomes OK
Number of running apps: 120 Number of server cores: 32 2 versions of streampark are deployed on the server: streampark_2.1.0_scala2.11 and streampark_2.1.0_scala2.12,Only streampark_2.1.0_scala2.12 has this exception.
Error Exception
2023-05-29 10:20:10 | ERROR | scheduling-1 | org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler:95] Unexpected error occurred in scheduled task
java.util.concurrent.RejectedExecutionException: Task org.apache.streampark.console.core.task.FlinkRESTAPIWatcher$$Lambda$967/748555589@2967b295 rejected from java.util.concurrent.ThreadPoolExecutor@39e42bfc[Running, pool size = 320, active threads = 320, queued tasks = 1024, completed tasks = 97808357]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.doWatch(FlinkRESTAPIWatcher.java:199)
at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.start(FlinkRESTAPIWatcher.java:185)
at sun.reflect.GeneratedMethodAccessor891.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2023-05-29 10:20:11 | ERROR | scheduling-1 | org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler:95] Unexpected error occurred in scheduled task
java.util.concurrent.RejectedExecutionException: Task org.apache.streampark.console.core.task.FlinkRESTAPIWatcher$$Lambda$967/748555589@160b86e9 rejected from java.util.concurrent.ThreadPoolExecutor@39e42bfc[Running, pool size = 320, active threads = 320, queued tasks = 1024, completed tasks = 97808357]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.doWatch(FlinkRESTAPIWatcher.java:199)
at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.start(FlinkRESTAPIWatcher.java:185)
at sun.reflect.GeneratedMethodAccessor891.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
2023-05-29 10:20:12 | ERROR | scheduling-1 | org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler:95] Unexpected error occurred in scheduled task
java.util.concurrent.RejectedExecutionException: Task org.apache.streampark.console.core.task.FlinkRESTAPIWatcher$$Lambda$967/748555589@17e64df3 rejected from java.util.concurrent.ThreadPoolExecutor@39e42bfc[Running, pool size = 320, active threads = 320, queued tasks = 1024, completed tasks = 97808357]
at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.doWatch(FlinkRESTAPIWatcher.java:199)
at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.start(FlinkRESTAPIWatcher.java:185)
at sun.reflect.GeneratedMethodAccessor891.invoke(Unknown Source)
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
at java.lang.reflect.Method.invoke(Method.java:498)
at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
Screenshots
Are you willing to submit PR?
- [ ] Yes I am willing to submit a PR!(您是否要贡献这个PR?)
Code of Conduct
- [X] I agree to follow this project's Code of Conduct
In this situation, the thread pool is full, so the new tasks will be rejected. Increasing the num of threads or the size of blocking queue may mitigate this problem, but cannot fix it thoroughly, it's highly possible that the backlog will grows over time.
Whenever doWatch method is invoked, FlinkRestApiWatcher will add a monitor job for every tracked app, since WATCHING_APPS contains all the apps need to be tracked, the queued tasks in the thread pool can be discarded. So I have two solutions for this issue:
1.clear all the queued tasks in thread pool when doWatch
method is invoked;
2.set the reject policy of thread pool to DiscardOldestPolicy
what's your opinion ? @wolfboys
Since most of the tasks in thread pool are duplicated, maybe we can define a special blocking queue which can deduplicate tasks for the the same app naturally when building the thread pool .