druid
druid copied to clipboard
Fix future control bug for taskClient.pause
Description
-
In our production system, a case like this: For
SeekableStreamSupervisor#checkTaskDuration()
, if the following code timeout happens, then pausing tasks in taskClient were still running.List<Map<PartitionIdType, SequenceOffsetType>> results = Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
But finally the pausing task finished normally, the groupId will not be moved fromactivelyReadingTaskGroups
topendingCompletionTaskGroups
, because the code block after above code will not be executed. -
Then next round
checkTaskDuration()
executed when the task has been PUBLISHING, "Can't pause exception" thrown. -
Furthermore, if the wild pausing task executed in thread pool for long time, the bad result is unknown. So if
checkTaskDuraion()
reaches to end whether normally or abnormally, all pausing tasks should be finished. -
If one taskGroup failed, those successful taskGroups will be affected.
-
Solution
- In
SeekableStreamIndexTaskClient
, add some code to manage the pausing task request, can submit and stop task. - In
SeekableStreamSupervisor
, IfcheckTaskDuraion()
reaches to end, taskClient can ensure that all the pausing task requests are finished.
Fixed the bug ...
- SeeableStreamIndexTaskClient [Class organization and design]
- Add a class
PauseCallable
, implements fromCallable
, to wrap the pausing task feature and providestop()
to trigger the pausing task can be finished. - Move
pause()
from classSeeableStreamIndexTaskClient
to classPauseCallable
- SeeableStreamIndexSupervisor [Method organization and design]
- Add a single method
moveGroupFromReadingToPending()
for each finished pausing task. - Add finally block in
checkTaskDuration()
, that ensures taskClient can control all the running tasks.
Renamed the class ...
[No]
Added a forbidden-apis entry ...
[No]
Key changed/added classes in this PR
-
org.apache.druid.indexing.seekablestream.supervisor.SeekableStreamSupervisor
-
org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskClient
This PR has:
- [x] been self-reviewed.
- [x] using the concurrency checklist (Remove this item if the PR doesn't have any relation to concurrency.)
- [ ] added documentation for new or modified features or behaviors.
- [x] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
- [ ] added or updated version, license, or notice information in licenses.yaml
- [x] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
- [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
- [x] added integration tests.
- [x] been tested in a test Druid cluster.
@clintropolis Hi, Can you help me? Why the build failed all the time? For the same job, this time failed, but last time building, it passed. I have a look at the failure jobs, two kinds of failing result:
- docker pull limit
- test failure (but why last time, the same job test passed)
@clintropolis can you or your team help review this PR? this issue happened in our production for 3 times this year, even if this is is not the root cause, but finally due to this bug, the task lost control and looks choas.
is this change (https://github.com/apache/druid/pull/12167) by any chance? We were not handling the pausing tasks really well.
is this change (#12167) by any chance? We were not handling the pausing tasks really well.
@abhishekagarwal87 @clintropolis @kfaraz
Your fix change is really a bug, but that is not the root cause. In our production, when the ingestion task received pausing request, but due to the high disk usage, then the "persist action" will last for long time(about 10 minutes), the the "pausing task future" will be timeout.
In SeekableStreamSupervisor
:
this.futureTimeoutInSeconds = Math.max(
MINIMUM_FUTURE_TIMEOUT_IN_SECONDS,
tuningConfig.getChatRetries() * (tuningConfig.getHttpTimeout().getStandardSeconds()
+ IndexTaskClient.MAX_RETRY_WAIT_SECONDS)
(in our production, this value is about: max(120, 8 * (10s + 10s)) = 160s)
checkTaskDuration():
Futures.successfulAsList(futures).get(futureTimeoutInSeconds, TimeUnit.SECONDS);
And In SeekableStreamIndexTaskClient::pause()
, even if you fix that bug, need more than 3435s to break the while.
while (true) {
final Duration delay = retryPolicy.getAndIncrementRetryDelay();
if (delay == null) { // need 3435 seconds to become null
throw new ISE(
"Task [%s] failed to change its status from [%s] to [%s], aborting",
id,
status,
SeekableStreamIndexTaskRunner.Status.PAUSED
);
}
}
So that is the problem: futureTimeout << pausingRetryDration. Even if we reduce the delay duration or reduce the retry number, but that will not help us a lot.
I mean we need strict control for ingestion tasks, not dependent on the timeout. So this is the goal of my change.
This pull request has been marked as stale due to 60 days of inactivity. It will be closed in 4 weeks if no further activity occurs. If you think that's incorrect or this pull request should instead be reviewed, please simply write any comment. Even if closed, you can still revive the PR at any time or discuss it on the [email protected] list. Thank you for your contributions.
@abhishekagarwal87 @clintropolis @kfaraz Can you give reply for this PR?
This issue is no longer marked as stale.
Hi @panhongan
The issue in SeekableStreamTaskInternal
seems to be that pause()
is called after the "main loop" within runInternal()
is successful.
Would it help to have a check for ingestionState == IngestionState.COMPLETED
, in which case just return a 200 status with the current offsets as response irrespective of the task status i.e behave like the request to pause was successful?
@kfaraz Thanks very much for your review and comments. Good suggestion for me.
I have simplified SeekableStreamIndexTaskClient
. And help review again.
@AmatyaAvadhanula Thanks for your review, too.
Yes, for better task status control, I make change for SeekableStreamIndexTaskRunner#pause()
.
The bad case is: in current checkTaskDuration()
round, the task pause timeout, but for next checkTaskDuration()
round, the task has become PUBLISHING
. I think the supervisor should accept the fact that the task is running ok, not to throw exception: Can't pause, task state is invalid (state: [PUBLISHING])
.
@kfaraz @AmatyaAvadhanula You can help review this point.
@panhongan , thanks for the fixes, there seem to be some genuine CI failures. Could you please take a look?
@panhongan , thanks for the fixes, there seem to be some genuine CI failures. Could you please take a look?
I add some test cases to enhance the Unit Test Coverage. It looks ok now. Thanks.