dolphinscheduler
dolphinscheduler copied to clipboard
[Bug][AbstractDelayEventBus] TaskRetryLifecycleEvent block other events
Search before asking
- [x] I had searched in the issues and found no similar feature requirement.
Description
a long time blocking will make the master server unresponsive.
so i'm thinking, how about adding a dedicated queue for delayed events?
or use PriorityBlockingQueue and manual implement DelayQueue's logic like org.apache.dolphinscheduler.server.master.runner.queue.PriorityDelayQueue ?
Are you willing to submit a PR?
- [x] Yes I am willing to submit a PR!
Code of Conduct
- [x] I agree to follow this project's Code of Conduct
cc @ruanwenjun
DelayQueue contains a PriorityBlockingQueue , the element 1 should be return here, so it will not block other events.
The main problem here is the event bus worker will poll events from the event bus at an interval (default is 100ms), since one worker needs to fire multiple event bus, so it cannot block here. But this is ok, since poll is very fast.
It's great if we can move PriorityDelayQueue under dolphinscheduler-eventbus, since the main logic are same at DelayEntry and AbstractDelayEvent.
@ruanwenjun i did some test for a 5mins failed retry task, run workflow, the task failed and waiting retry, and stop the workflow, the workflow stop after 5mins.
...
[WI-0][TI-0] - 2025-01-22 14:55:19.537 INFO [MasterRpcServer-methodInvoker-5] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskRunningLifecycleEvent{task=<Task-with-retry>, runtimeContext=null}
[WI-3954361][TI-0] - 2025-01-22 14:55:19.641 INFO [ds-workflow-eventbus-worker-11] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskRunningLifecycleEvent{task=<Task-with-retry>, runtimeContext=null} with state RUNNING_EXECUTION
[WI-0][TI-0] - 2025-01-22 14:55:20.400 INFO [MasterRpcServer-methodInvoker-12] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskFailedLifecycleEvent{task=<Task-with-retry>, endTime=Wed Jan 22 14:55:20 GMT+08:00 2025}
[WI-3954361][TI-0] - 2025-01-22 14:55:20.445 INFO [ds-workflow-eventbus-worker-10] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskRetryLifecycleEvent{task=<Task-with-retry>, delayTime=300096/ms}
[WI-3954361][TI-0] - 2025-01-22 14:55:20.447 INFO [ds-workflow-eventbus-worker-10] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskFailedLifecycleEvent{task=<Task-with-retry>, endTime=Wed Jan 22 14:55:20 GMT+08:00 2025} with state RUNNING_EXECUTION
[WI-0][TI-0] - 2025-01-22 14:55:34.205 INFO [MasterRpcServer-methodInvoker-27] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: WorkflowStopLifecycleEvent{workflow=<Workflow-with-retry-task>-20250122145518737}
@@@@#### here was blocking WorkflowStopLifecycleEvent for 5mins ####@@@@
[WI-3954361][TI-0] - 2025-01-22 15:00:20.577 INFO [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskStartLifecycleEvent{task=<Task-with-retry>}
[WI-3954361][TI-0] - 2025-01-22 15:00:20.578 INFO [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskRetryLifecycleEvent{task=<Task-with-retry>, delayTime=300096/ms} with state FAILURE
[WI-3954361][TI-0] - 2025-01-22 15:00:20.579 INFO [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.w.l.h.AbstractWorkflowLifecycleEventHandler:[47] - Begin fire workflow <Workflow-with-retry-task>-20250122145518737 LifecycleEvent[WorkflowStopLifecycleEvent{workflow=<Workflow-with-retry-task>-20250122145518737}] with state: RUNNING_EXECUTION
[WI-3954361][TI-0] - 2025-01-22 15:00:20.582 INFO [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.w.s.AbstractWorkflowStateAction:[150] - Success set WorkflowExecuteRunnable: <Workflow-with-retry-task>-20250122145518737 state from: RUNNING_EXECUTION to READY_STOP
...
and i just found the main reason is here !!:
https://github.com/apache/dolphinscheduler/blob/352b47bd8576a47f83285ecfffec589de462fac0/dolphinscheduler-eventbus/src/main/java/org/apache/dolphinscheduler/eventbus/AbstractDelayEvent.java#L62-L64
AbstractDelayEvent use createTimeInNano to compare other event, DelayQueue will sort the events using createTimeInNano, so the retry event was first put in queue, DelayQueue will take retry event first.
if i change the compared value createTimeInNano to createTimeInNano + delayTime, that will not block the Following 0 delay events any more.
It's great if we can move
PriorityDelayQueueunderdolphinscheduler-eventbus, since the main logic are same atDelayEntryandAbstractDelayEvent.
Yeah, I would prefer to do it in two steps, first prioritize fixing the existing issues. then refactor the code later. how about that?
It's great if we can move
PriorityDelayQueueunderdolphinscheduler-eventbus, since the main logic are same atDelayEntryandAbstractDelayEvent.Yeah, I would prefer to do it in two steps, first prioritize fixing the existing issues. then refactor the code later. how about that?
LGTM, I create two sub issue and assigned to you.
@ruanwenjun i did some test for a 5mins failed retry task, run workflow, the task failed and waiting retry, and stop the workflow, the workflow stop after 5mins.
... [WI-0][TI-0] - 2025-01-22 14:55:19.537 INFO [MasterRpcServer-methodInvoker-5] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskRunningLifecycleEvent{task=<Task-with-retry>, runtimeContext=null} [WI-3954361][TI-0] - 2025-01-22 14:55:19.641 INFO [ds-workflow-eventbus-worker-11] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskRunningLifecycleEvent{task=<Task-with-retry>, runtimeContext=null} with state RUNNING_EXECUTION [WI-0][TI-0] - 2025-01-22 14:55:20.400 INFO [MasterRpcServer-methodInvoker-12] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskFailedLifecycleEvent{task=<Task-with-retry>, endTime=Wed Jan 22 14:55:20 GMT+08:00 2025} [WI-3954361][TI-0] - 2025-01-22 14:55:20.445 INFO [ds-workflow-eventbus-worker-10] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskRetryLifecycleEvent{task=<Task-with-retry>, delayTime=300096/ms} [WI-3954361][TI-0] - 2025-01-22 14:55:20.447 INFO [ds-workflow-eventbus-worker-10] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskFailedLifecycleEvent{task=<Task-with-retry>, endTime=Wed Jan 22 14:55:20 GMT+08:00 2025} with state RUNNING_EXECUTION [WI-0][TI-0] - 2025-01-22 14:55:34.205 INFO [MasterRpcServer-methodInvoker-27] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: WorkflowStopLifecycleEvent{workflow=<Workflow-with-retry-task>-20250122145518737} @@@@#### here was blocking WorkflowStopLifecycleEvent for 5mins ####@@@@ [WI-3954361][TI-0] - 2025-01-22 15:00:20.577 INFO [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.WorkflowEventBus:[41] - Publish event: TaskStartLifecycleEvent{task=<Task-with-retry>} [WI-3954361][TI-0] - 2025-01-22 15:00:20.578 INFO [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.t.l.h.AbstractTaskLifecycleEventHandler:[47] - Fired task <Task-with-retry> TaskRetryLifecycleEvent{task=<Task-with-retry>, delayTime=300096/ms} with state FAILURE [WI-3954361][TI-0] - 2025-01-22 15:00:20.579 INFO [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.w.l.h.AbstractWorkflowLifecycleEventHandler:[47] - Begin fire workflow <Workflow-with-retry-task>-20250122145518737 LifecycleEvent[WorkflowStopLifecycleEvent{workflow=<Workflow-with-retry-task>-20250122145518737}] with state: RUNNING_EXECUTION [WI-3954361][TI-0] - 2025-01-22 15:00:20.582 INFO [ds-workflow-eventbus-worker-20] o.a.d.s.m.e.w.s.AbstractWorkflowStateAction:[150] - Success set WorkflowExecuteRunnable: <Workflow-with-retry-task>-20250122145518737 state from: RUNNING_EXECUTION to READY_STOP ...and i just found the main reason is here !!:
Lines 62 to 64 in 352b47b
public int compareTo(Delayed other) { return Long.compare(this.createTimeInNano, ((AbstractDelayEvent) other).createTimeInNano); } AbstractDelayEvent use createTimeInNano to compare other event, DelayQueue will sort the events using createTimeInNano, so the retry event was first put in queue, DelayQueue will take retry event first.
if i change the compared value
createTimeInNanotocreateTimeInNano + delayTime, that will not block the Following 0 delay events any more.
This is a bug 👍 .
If the event is not expired then should use exipre time to compare them, if the event is already expired then should use create time to compare them.
If the event is not expired then should use expire time to compare them, if the event is already expired then should use create time to compare them.
/* PriorityQueue.class */
private void siftUpComparable(int var1, E var2) {
Comparable var3;
int var4;
for(var3 = (Comparable)var2; var1 > 0; var1 = var4) {
var4 = var1 - 1 >>> 1;
Object var5 = this.queue[var4];
if (var3.compareTo(var5) >= 0) {
break;
}
this.queue[var1] = var5;
}
this.queue[var1] = var3;
}
it seems only affect to new element in offer(), if event is expired, new event will also append to the end of expired event,i think use expired time to compare is still working.
This issue has been automatically marked as stale because it has not had recent activity for 30 days. It will be closed in next 7 days if no further activity occurs.