druid icon indicating copy to clipboard operation
druid copied to clipboard

KafkaIndexingTask pending forever after restart in remote mode

Open zen201415 opened this issue 6 years ago • 4 comments

Affected Version

All

Description

Cluster size

2Master Node,2 Query Node,4Data Node In fact,if you want to reproduce the problem , you are advised to use only 1 overlord and 1 middlemanager.

Configurations in use

druid.indexer.runner.type = remote

default configurations

Reason

Code 1:

org.apache.druid.indexing.overlord.TaskQueue#manage

  while (active) {
    for (final Task task : ImmutableList.copyOf(tasks)) {
          if (!taskFutures.containsKey(task.getId())) {
           // doSomething...
           taskFutures.put(task.getId(), attachCallbacks(task, runnerTaskFuture));
         }
     }

   managementMayBeNecessary.awaitNanos(60s);
}

If a task both in tasks and taskFutures , it will be considered to be monitored with callbacks; when it status changed,it will notify mysql to sync status from zk.

But what if it status changed, and callbacks did not work ?

Code 2:

org.apache.druid.indexing.overlord.RemoteTaskRunner#addWorker

zkWorker.addListener(

new PathChildrenCacheListener()
{

public void childEvent(CuratorFramework client, PathChildrenCacheEvent event)
{
      ...

case CHILD_UPDATED:

      ...

if ((tmp = runningTasks.get(taskId)) != null) {
  taskRunnerWorkItem = tmp;
} else {
  final RemoteTaskRunnerWorkItem newTaskRunnerWorkItem = new RemoteTaskRunnerWorkItem(
      taskId,
      announcement.getTaskType(),
      zkWorker.getWorker(),
      TaskLocation.unknown(),
      announcement.getTaskDataSource()
  );
  final RemoteTaskRunnerWorkItem existingItem = runningTasks.putIfAbsent(
      taskId,
      newTaskRunnerWorkItem
  );
  if (existingItem == null) {
    log.warn(
        "Worker[%s] announced a status for a task I didn't know about, adding to runningTasks: %s",
        zkWorker.getWorker().getHost(),
        taskId
    );
    taskRunnerWorkItem = newTaskRunnerWorkItem;
  } else {
    taskRunnerWorkItem = existingItem;
  }
}
...
if (announcement.getTaskStatus().isComplete()) {
  taskComplete(taskRunnerWorkItem, zkWorker, announcement.getTaskStatus());
  runPendingTasks();
}
}
}
)

if the task in zk not exisit in runningTasks, it will new a taskRunnerWorkItem without attch callbacks on it .And if the task status change to failed,it will execute the new callback . So the task miss to sync zk status to mysql,but it still stay in tasks and taskFutures.

it will be pending forever.

Steps to reproduce the problem

  1. start overlord ,start coordinator ,start middlemanager ,start historical ,start broker .
  2. when there is a task running in middlemanager ,stop middlemanager ,and stop overlord after middlemanager.
  3. a few minutes later,start overlord.
  4. tail -f overlord's log , when you see "Beginning management".it means start code 1, 30s or 90s after it appeared, start middlemanager.
  5. after middlemanager started , there would be a task in pending status.

I hava already reproduce the problem. After I dump the overlord, the task exist is really both in tasks and taskFutures list. The pending task will be clear after killed and reset the supervisor.

zen201415 avatar Sep 24 '19 06:09 zen201415

This issue has been marked as stale due to 280 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If this issue is still relevant, please simply write any comment. Even if closed, you can still revive the issue at any time or discuss it on the [email protected] list. Thank you for your contributions.

stale[bot] avatar Jul 02 '20 07:07 stale[bot]

This issue has been closed due to lack of activity. If you think that is incorrect, or the issue requires additional review, you can revive the issue at any time.

stale[bot] avatar Aug 01 '20 09:08 stale[bot]

This issue is no longer marked as stale.

stale[bot] avatar Aug 11 '22 21:08 stale[bot]

this still happens when using remote for the druid.indexer.runner.type, apologies that stalebot closed this issue. I also ran into another issue when performing the same steps with similar outcome (though slightly worse since it breaks the task listing API as well).

2022-08-09T23:08:21,703 WARN [qtp2066066225-85] org.eclipse.jetty.server.HttpChannel - /druid/indexer/v1/tasks
java.lang.IllegalStateException: Duplicate key TaskRunnerWorkItem{taskId='index_kafka_wikipedia_stream_603bb609024a3d4_gnhkkopj', result=com.google.common.util.concurrent.SettableFuture@575af1d6, createdTime=2022-08-09T23:07:11.510Z, queueInsertionTime=2022-08-09T23:07:11.510Z, location=TaskLocation{host='null', port=-1, tlsPort=-1}}
	at java.util.stream.Collectors.lambda$throwingMerger$0(Collectors.java:133) ~[?:1.8.0_332]
	at java.util.HashMap.merge(HashMap.java:1255) ~[?:1.8.0_332]
	at java.util.stream.Collectors.lambda$toMap$58(Collectors.java:1320) ~[?:1.8.0_332]
	at java.util.stream.ReduceOps$3ReducingSink.accept(ReduceOps.java:169) ~[?:1.8.0_332]
	at java.util.Iterator.forEachRemaining(Iterator.java:116) ~[?:1.8.0_332]
	at java.util.Spliterators$IteratorSpliterator.forEachRemaining(Spliterators.java:1801) ~[?:1.8.0_332]
	at java.util.stream.AbstractPipeline.copyInto(AbstractPipeline.java:482) ~[?:1.8.0_332]
	at java.util.stream.AbstractPipeline.wrapAndCopyInto(AbstractPipeline.java:472) ~[?:1.8.0_332]
	at java.util.stream.ReduceOps$ReduceOp.evaluateSequential(ReduceOps.java:708) ~[?:1.8.0_332]
	at java.util.stream.AbstractPipeline.evaluate(AbstractPipeline.java:234) ~[?:1.8.0_332]
	at java.util.stream.ReferencePipeline.collect(ReferencePipeline.java:566) ~[?:1.8.0_332]
	at org.apache.druid.indexing.overlord.http.OverlordResource.getTaskRunnerWorkItems(OverlordResource.java:880) ~[classes/:?]
	at org.apache.druid.indexing.overlord.http.OverlordResource.getTaskStatusPlusList(OverlordResource.java:739) ~[classes/:?]

Restarting the overlord seems to fix either issue. I have been unable to reproduce it using httpRemote for druid.indexer.runner.type, so that is a possible workaround.

clintropolis avatar Aug 11 '22 21:08 clintropolis