druid icon indicating copy to clipboard operation
druid copied to clipboard

Fix future control bug for taskClient.pause

Open panhongan opened this issue 3 years ago • 11 comments

Description

  1. In our production system, a case like this: For SeekableStreamSupervisor#checkTaskDuration(), if the following code timeout happens, then pausing tasks in taskClient were still running. List<Map<PartitionIdType, SequenceOffsetType>> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS); But finally the pausing task finished normally, the groupId will not be moved from activelyReadingTaskGroups to pendingCompletionTaskGroups, because the code block after above code will not be executed.

  2. Then next round checkTaskDuration() executed when the task has been PUBLISHING, "Can't pause exception" thrown.

  3. Furthermore, if the wild pausing task executed in thread pool for long time, the bad result is unknown. So if checkTaskDuraion() reaches to end whether normally or abnormally, all pausing tasks should be finished.

  4. If one taskGroup failed, those successful taskGroups will be affected.

  5. Solution

  • In SeekableStreamIndexTaskClient, add some code to manage the pausing task request, can submit and stop task.
  • In SeekableStreamSupervisor, If checkTaskDuraion() reaches to end, taskClient can ensure that all the pausing task requests are finished.

Fixed the bug ...

  1. SeeableStreamIndexTaskClient [Class organization and design]
  • Add a class PauseCallable, implements from Callable, to wrap the pausing task feature and provide stop() to trigger the pausing task can be finished.
  • Move pause() from class SeeableStreamIndexTaskClient to class PauseCallable
  1. SeeableStreamIndexSupervisor [Method organization and design]
  • Add a single method moveGroupFromReadingToPending() for each finished pausing task.
  • Add finally block in checkTaskDuration(), that ensures taskClient can control all the running tasks.

Renamed the class ...

[No]

Added a forbidden-apis entry ...

[No]


Key changed/added classes in this PR
  • org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor
  • org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient

This PR has:

  • [x] been self-reviewed.
    • [x] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
  • [ ] added documentation for new or modified features or behaviors.
  • [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • [ ] added or updated version, license, or notice information in licenses.yaml
  • [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [x] added integration tests.
  • [x] been tested in a test Druid cluster.

panhongan avatar May 25 '21 04:05 panhongan

@clintropolis Hi, Can you help me? Why the build failed all the time? For the same job, this time failed, but last time building, it passed. I have a look at the failure jobs, two kinds of failing result:

  1. docker pull limit
  2. test failure (but why last time, the same job test passed)

panhongan avatar May 29 '21 14:05 panhongan

@clintropolis can you or your team help review this PR? this issue happened in our production for 3 times this year, even if this is is not the root cause, but finally due to this bug, the task lost control and looks choas.

panhongan avatar Jan 27 '22 09:01 panhongan

is this change (https://github.com/apache/druid/pull/12167) by any chance? We were not handling the pausing tasks really well.

abhishekagarwal87 avatar Jan 27 '22 10:01 abhishekagarwal87

is this change (#12167) by any chance? We were not handling the pausing tasks really well.

@abhishekagarwal87 @clintropolis @kfaraz

Your fix change is really a bug, but that is not the root cause. In our production, when the ingestion task received pausing request, but due to the high disk usage, then the "persist action" will last for long time(about 10 minutes), the the "pausing task future" will be timeout.

In SeekableStreamSupervisor:

this.futureTimeoutInSeconds = Math.max(
        MINIMUM_FUTURE_TIMEOUT_IN_SECONDS,
        tuningConfig.getChatRetries() * (tuningConfig.getHttpTimeout().getStandardSeconds()
                                         + IndexTaskClient.MAX_RETRY_WAIT_SECONDS)

(in our production, this value is about: max(120, 8 * (10s + 10s)) = 160s)


checkTaskDuration():
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);

And In SeekableStreamIndexTaskClient::pause(), even if you fix that bug, need more than 3435s to break the while.

while (true) {
    final Duration delay = retryPolicy.getAndIncrementRetryDelay();
     if (delay == null) {  // need 3435 seconds to become null
            throw new ISE(
                "Task [%s] failed to change its status from [%s] to [%s], aborting",
                id,
                status,
                SeekableStreamIndexTaskRunner.Status.PAUSED
            );
    }
}

So that is the problem: futureTimeout << pausingRetryDration. Even if we reduce the delay duration or reduce the retry number, but that will not help us a lot.

I mean we need strict control for ingestion tasks, not dependent on the timeout. So this is the goal of my change.

panhongan avatar Jan 28 '22 03:01 panhongan

This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the [email protected] list. Thank you for your contributions.

stale[bot] avatar Apr 16 '22 11:04 stale[bot]

@abhishekagarwal87 @clintropolis @kfaraz Can you give reply for this PR?

panhongan avatar Apr 18 '22 06:04 panhongan

This issue is no longer marked as stale.

stale[bot] avatar Apr 18 '22 06:04 stale[bot]

Hi @panhongan

The issue in SeekableStreamTaskInternal seems to be that pause() is called after the "main loop" within runInternal() is successful.

Would it help to have a check for ingestionState == IngestionState.COMPLETED, in which case just return a 200 status with the current offsets as response irrespective of the task status i.e behave like the request to pause was successful?

AmatyaAvadhanula avatar Jul 15 '22 17:07 AmatyaAvadhanula

@kfaraz Thanks very much for your review and comments. Good suggestion for me. I have simplified SeekableStreamIndexTaskClient. And help review again.

@AmatyaAvadhanula Thanks for your review, too. Yes, for better task status control, I make change for SeekableStreamIndexTaskRunner#pause(). The bad case is: in current checkTaskDuration() round, the task pause timeout, but for next checkTaskDuration() round, the task has become PUBLISHING. I think the supervisor should accept the fact that the task is running ok, not to throw exception: Can't pause, task state is invalid (state: [PUBLISHING]).

@kfaraz @AmatyaAvadhanula You can help review this point.

panhongan avatar Aug 15 '22 14:08 panhongan

@panhongan , thanks for the fixes, there seem to be some genuine CI failures. Could you please take a look?

kfaraz avatar Sep 27 '22 04:09 kfaraz

@panhongan , thanks for the fixes, there seem to be some genuine CI failures. Could you please take a look?

I add some test cases to enhance the Unit Test Coverage. It looks ok now. Thanks.

panhongan avatar Oct 01 '22 06:10 panhongan