KafkaIndexingTask pending forever after restart in remote mode
Affected Version
All
Description
Cluster size
2Master Node,2 Query Node,4Data Node In fact,if you want to reproduce the problem , you are advised to use only 1 overlord and 1 middlemanager.
Configurations in use
druid.indexer.runner.type = remote
default configurations
Reason
Code 1:
org.apache.druid.indexing.overlord.TaskQueue#manage
while (active) {
for (final Task task : ImmutableList.copyOf(tasks)) {
if (!taskFutures.containsKey(task.getId())) {
// doSomething...
taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
}
}
managementMayBeNecessary.awaitNanos(60s);
}
If a task both in tasks and taskFutures , it will be considered to be monitored with callbacks; when it status changed,it will notify mysql to sync status from zk.
But what if it status changed, and callbacks did not work ?
Code 2:
org.apache.druid.indexing.overlord.RemoteTaskRunner#addWorker
zkWorker.addListener(
new PathChildrenCacheListener()
{
public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
{
...
case CHILD_UPDATED:
...
if ((tmp = runningTasks.get(taskId)) != null) {
taskRunnerWorkItem = tmp;
} else {
final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
taskId,
announcement.getTaskType(),
zkWorker.getWorker(),
TaskLocation.unknown(),
announcement.getTaskDataSource()
);
final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
taskId,
newTaskRunnerWorkItem
);
if (existingItem == null) {
log.warn(
"Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
zkWorker.getWorker().getHost(),
taskId
);
taskRunnerWorkItem = newTaskRunnerWorkItem;
} else {
taskRunnerWorkItem = existingItem;
}
}
...
if (announcement.getTaskStatus().isComplete()) {
taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
runPendingTasks();
}
}
}
)
if the task in zk not exisit in runningTasks, it will new a taskRunnerWorkItem without attch callbacks on it .And if the task status change to failed,it will execute the new callback . So the task miss to sync zk status to mysql,but it still stay in tasks and taskFutures.
it will be pending forever.
Steps to reproduce the problem
- start overlord ,start coordinator ,start middlemanager ,start historical ,start broker .
- when there is a task running in middlemanager ,stop middlemanager ,and stop overlord after middlemanager.
- a few minutes later,start overlord.
- tail -f overlord's log , when you see "Beginning management".it means start code 1, 30s or 90s after it appeared, start middlemanager.
- after middlemanager started , there would be a task in pending status.
I hava already reproduce the problem. After I dump the overlord, the task exist is really both in tasks and taskFutures list. The pending task will be clear after killed and reset the supervisor.
This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the [email protected] list. Thank you for your contributions.
This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.
This issue is no longer marked as stale.
this still happens when using remote for the druid.indexer.runner.type, apologies that stalebot closed this issue. I also ran into another issue when performing the same steps with similar outcome (though slightly worse since it breaks the task listing API as well).
2022-08-09T23:08:21,703 WARN [qtp2066066225-85] org.eclipse.jetty.server.HttpChannel - /druid/indexer/v1/tasks
java.lang.IllegalStateException: Duplicate key TaskRunnerWorkItem{taskId='index_kafka_wikipedia_stream_603bb609024a3d4_gnhkkopj', result=com.google.common.util.concurrent.SettableFuture@575af1d6, createdTime=2022-08-09T23:07:11.510Z, queueInsertionTime=2022-08-09T23:07:11.510Z, location=TaskLocation{host='null', port=-1, tlsPort=-1}}
at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) ~[?:1.8.0_332]
at java.util.HashMap.merge(HashMap.java:1255) ~[?:1.8.0_332]
at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) ~[?:1.8.0_332]
at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:1.8.0_332]
at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_332]
at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[?:1.8.0_332]
at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_332]
at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_332]
at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_332]
at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_332]
at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) ~[?:1.8.0_332]
at org.apache.druid.indexing.overlord.http.OverlordResource.getTaskRunnerWorkItems(OverlordResource.java:880) ~[classes/:?]
at org.apache.druid.indexing.overlord.http.OverlordResource.getTaskStatusPlusList(OverlordResource.java:739) ~[classes/:?]
Restarting the overlord seems to fix either issue. I have been unable to reproduce it using httpRemote for druid.indexer.runner.type, so that is a possible workaround.