incubator-streampark icon indicating copy to clipboard operation
incubator-streampark copied to clipboard

[Bug] The task status cannot be obtained, and the log shows a RejectedExecutionException exception.

Open Trgree opened this issue 1 year ago • 2 comments

Search before asking

  • [X] I had searched in the issues and found no similar issues.

Java Version

JDK 1.8

Scala Version

2.12.x

StreamPark Version

2.1.0

Flink Version

flink 1.14.6

deploy mode

yarn-application

What happened

After the task is submitted, the status is always starting, but the actual task status is running. When I restart streampark, it becomes OK

Number of running apps: 120 Number of server cores: 32 2 versions of streampark are deployed on the server: streampark_2.1.0_scala2.11 and streampark_2.1.0_scala2.12,Only streampark_2.1.0_scala2.12 has this exception.

Error Exception

2023-05-29 10:20:10 | ERROR | scheduling-1 | org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler:95] Unexpected error occurred in scheduled task
java.util.concurrent.RejectedExecutionException: Task org.apache.streampark.console.core.task.FlinkRESTAPIWatcher$$Lambda$967/748555589@2967b295 rejected from java.util.concurrent.ThreadPoolExecutor@39e42bfc[Running, pool size = 320, active threads = 320, queued tasks = 1024, completed tasks = 97808357]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.doWatch(FlinkRESTAPIWatcher.java:199)
	at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.start(FlinkRESTAPIWatcher.java:185)
	at sun.reflect.GeneratedMethodAccessor891.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2023-05-29 10:20:11 | ERROR | scheduling-1 | org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler:95] Unexpected error occurred in scheduled task
java.util.concurrent.RejectedExecutionException: Task org.apache.streampark.console.core.task.FlinkRESTAPIWatcher$$Lambda$967/748555589@160b86e9 rejected from java.util.concurrent.ThreadPoolExecutor@39e42bfc[Running, pool size = 320, active threads = 320, queued tasks = 1024, completed tasks = 97808357]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.doWatch(FlinkRESTAPIWatcher.java:199)
	at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.start(FlinkRESTAPIWatcher.java:185)
	at sun.reflect.GeneratedMethodAccessor891.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)
2023-05-29 10:20:12 | ERROR | scheduling-1 | org.springframework.scheduling.support.TaskUtils$LoggingErrorHandler:95] Unexpected error occurred in scheduled task
java.util.concurrent.RejectedExecutionException: Task org.apache.streampark.console.core.task.FlinkRESTAPIWatcher$$Lambda$967/748555589@17e64df3 rejected from java.util.concurrent.ThreadPoolExecutor@39e42bfc[Running, pool size = 320, active threads = 320, queued tasks = 1024, completed tasks = 97808357]
	at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2063)
	at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:830)
	at java.util.concurrent.ThreadPoolExecutor.execute(ThreadPoolExecutor.java:1379)
	at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.doWatch(FlinkRESTAPIWatcher.java:199)
	at org.apache.streampark.console.core.task.FlinkRESTAPIWatcher.start(FlinkRESTAPIWatcher.java:185)
	at sun.reflect.GeneratedMethodAccessor891.invoke(Unknown Source)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at org.springframework.scheduling.support.ScheduledMethodRunnable.run(ScheduledMethodRunnable.java:84)
	at org.springframework.scheduling.support.DelegatingErrorHandlingRunnable.run(DelegatingErrorHandlingRunnable.java:54)
	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
	at java.util.concurrent.FutureTask.runAndReset(FutureTask.java:308)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.access$301(ScheduledThreadPoolExecutor.java:180)
	at java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask.run(ScheduledThreadPoolExecutor.java:294)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Screenshots

image image

Are you willing to submit PR?

  • [ ] Yes I am willing to submit a PR!(您是否要贡献这个PR?)

Code of Conduct

Trgree avatar May 29 '23 03:05 Trgree

In this situation, the thread pool is full, so the new tasks will be rejected. Increasing the num of threads or the size of blocking queue may mitigate this problem, but cannot fix it thoroughly, it's highly possible that the backlog will grows over time.

Whenever doWatch method is invoked, FlinkRestApiWatcher will add a monitor job for every tracked app, since WATCHING_APPS contains all the apps need to be tracked, the queued tasks in the thread pool can be discarded. So I have two solutions for this issue:

1.clear all the queued tasks in thread pool when doWatch method is invoked; 2.set the reject policy of thread pool to DiscardOldestPolicy

what's your opinion ? @wolfboys

zhoulii avatar Jul 04 '23 08:07 zhoulii

Since most of the tasks in thread pool are duplicated, maybe we can define a special blocking queue which can deduplicate tasks for the the same app naturally when building the thread pool .

zhoulii avatar Jul 04 '23 08:07 zhoulii