druid
druid copied to clipboard
Kafka ingestion lag spikes up whenever tasks are rolling
Affected Version
0.21
Description
We have a druid cluster in which we are ingesting about 2.5M events/second. We have 90 ingestion tasks for 1 of our data sources with task duration set to 1 hr. Whenever the tasks roll every hour, Kafka ingestion lag spikes up anywhere from 3M to even 15M druid.ingest.kafka.lag. On further analysis, we noted that while tasks are rolling, some of the active ingestion tasks are stuck in pause state for a long time (sometimes up to 1.5-2 minutes) during which those tasks aren't ingesting any data resulting in ingestion lag spike.
Logs from MM tasks with a huge gap between pause and resume
{"@timestamp":"2021-06-21T17:34:51.628Z", "log.level":"DEBUG", "message":"Received pause command, pausing ingestion until resumed.", "service.name":"druid/middleManager","event.dataset":"druid/middleManager.log","process.thread.name":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
{"@timestamp":"2021-06-21T17:36:27.089Z", "log.level":"DEBUG", "message":"Received pause command, pausing ingestion until resumed.", "service.name":"druid/middleManager","event.dataset":"druid/middleManager.log","process.thread.name":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
{"@timestamp":"2021-06-21T17:36:27.097Z", "log.level":"DEBUG", "message":"Received resume command, resuming ingestion.", "service.name":"druid/middleManager","event.dataset":"druid/middleManager.log","process.thread.name":"task-runner-0-priority-0","log.logger":"org.apache.druid.indexing.seekablestream.SeekableStreamIndexTaskRunner"}
In the above loglines, we can see that that task was in pause state from 17:34:51 to 17:36:27.
On further analysis, we figured out that the MM taskRunner goes to a pause state when it is requesting a checkpoint notice here.
From the time the taskRunner submits the checkpoint notice, it actually takes around 1.5 minutes for the coordinator to actually process this checkpoint notice. We can see it in the coordinator logs below for a specific task.
Jun 21, 2021 @ 17:34:51.624 Performing action for task[<task_id>]: CheckPointDataSourceMetadataAction{supervisorId='<supervisor_id>', taskGroupId='14', checkpointMetadata=KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='<kafka-topic>', partitionSequenceNumberMap={104=238760642689, 14=337995870870}, exclusivePartitions=[]}}} coordinator-0
Jun 21, 2021 @ 17:34:51.624 Checkpointing [KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='<kafka-topic>', partitionSequenceNumberMap={104=238760642689, 14=337995870870}, exclusivePartitions=[]}}] for taskGroup [14] coordinator-0
Jun 21, 2021 @ 17:36:27.086 Pause task[<task_id>] coordinator-0
Jun 21, 2021 @ 17:36:27.087 HTTP POST: http://<MMHost:MMport>/druid/worker/v1/chat/<task_id>/pause coordinator-0
Jun 21, 2021 @ 17:36:27.089 SetEndOffsets task[<task_id>] endOffsets[{104=238763631003, 14=337998805846}] finalize[false] coordinator-0
Jun 21, 2021 @ 17:36:27.089 Task [<task_id>] paused successfully coordinator-0
Jun 21, 2021 @ 17:36:27.091 HTTP POST: http://<MMHost:MMport>/druid/worker/v1/chat/<task_id>/offsets/end?finish=false coordinator-0
Jun 21, 2021 @ 17:36:27.097 Handled checkpoint notice, new checkpoint is [{104=238763631003, 14=337998805846}] for taskGroup [14] coordinator-0
Note that this long pause of ingestion task happens only when tasks are rolling. Not during other times.
Our guess here is that, while tasks are rolling, the notices queue has a lot of notices in them and each notice takes a long time to be processed thus causing significant delay in the checkpoint notice as well to be processed once its added to the queue.
Currently, we do not have logs in place to figure out how many notices are there in this queue at any point and how long does each notice takes to get executed.
Spent some more time analyzing further and we saw that some of the following functions here are taking multiple seconds when tasks are rolling as opposed to few milliseconds when tasks aren't.
discoverTasks();
updateTaskStatus();
checkTaskDuration();
checkPendingCompletionTasks();
checkCurrentTaskState();
Some of the above functions require checking task status and that seems to be taking a long time.
Hi @harinirajendran, thank you for your report. Was it only one task being paused or all tasks for the same datasource? If it's the first case, then maybe your guess is correct that there were lots of notices in the queue because the task who initiated the checkpoint process pauses itself after it sends the checkpoint request to the supervisor. If it's the second case, it means the supervisor was already in processing of the checkpoint notice but hadn't finished it yet. It was probably waiting for all tasks to respond to the pause request or waiting for tasks to respond to the setEndOffset request (see SeekableStreamSupervisor.checkpointTaskGroup(). One of our customers had a similar issue at Imply before and the cause was frequent HTTP communication failures which were retried with backoff.
Hi @harinirajendran, thank you for your report. Was it only one task being paused or all tasks for the same datasource? If it's the first case, then maybe your guess is correct that there were lots of notices in the queue because the task who initiated the checkpoint process pauses itself after it sends the checkpoint request to the supervisor. If it's the second case, it means the supervisor was already in processing of the checkpoint notice but hadn't finished it yet. It was probably waiting for all tasks to respond to the pause request or waiting for tasks to respond to the setEndOffset request (see
SeekableStreamSupervisor.checkpointTaskGroup(). One of our customers had a similar issue at Imply before and the cause was frequent HTTP communication failures which were retried with backoff.
@jihoonson : This long pause happens whenever a task requests a checkpoint notice while task roll is happening for the same data source. So it happens with all tasks requesting checkpoint notice while task roll is going on. In our case, the tasks roll happens over a span of 5-6 minutes. Also, this ingestion lag spike consistently only happens around task rolls which makes me think these aren't because of HTTP communication issues.
Also, the timeline is as follows time t1: task x pauses itself and initiates a checkpoint request to the coordinator time t1: coordinator receives the checkpoint notice and adds it to the notices queue time t1+2 minutes: coordinator sends a pause to the task time t1+2 minutes: coordinator gets response to pause and then calls setEndOffset(which in turn resumes the task) So the coordinator processes the checkpoint notice and sends a pause only after about 2 minutes.
Following is the log sequence for the same. Notice the gap between 17:34:51.624 and 17:36:27.086 timestamp
Jun 21, 2021 @ 17:34:51.624 Performing action for task[<task_id>]: CheckPointDataSourceMetadataAction{supervisorId='<supervisor_id>', taskGroupId='14', checkpointMetadata=KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='<kafka-topic>', partitionSequenceNumberMap={104=238760642689, 14=337995870870}, exclusivePartitions=[]}}} coordinator-0
Jun 21, 2021 @ 17:34:51.624 Checkpointing [KafkaDataSourceMetadata{SeekableStreamStartSequenceNumbers=SeekableStreamStartSequenceNumbers{stream='<kafka-topic>', partitionSequenceNumberMap={104=238760642689, 14=337995870870}, exclusivePartitions=[]}}] for taskGroup [14] coordinator-0
Jun 21, 2021 @ 17:36:27.086 Pause task[<task_id>] coordinator-0
Jun 21, 2021 @ 17:36:27.087 HTTP POST: http://<MMHost:MMport>/druid/worker/v1/chat/<task_id>/pause coordinator-0
Jun 21, 2021 @ 17:36:27.089 SetEndOffsets task[<task_id>] endOffsets[{104=238763631003, 14=337998805846}] finalize[false] coordinator-0
Jun 21, 2021 @ 17:36:27.089 Task [<task_id>] paused successfully coordinator-0
Jun 21, 2021 @ 17:36:27.091 HTTP POST: http://<MMHost:MMport>/druid/worker/v1/chat/<task_id>/offsets/end?finish=false coordinator-0
Jun 21, 2021 @ 17:36:27.097 Handled checkpoint notice, new checkpoint is [{104=238763631003, 14=337998805846}] for taskGroup [14] coordinator-0
@harinirajendran interesting. Are any logs omitted between the second and the third lines (checkpointing and pause)? Or Is there no log between those two? If there is no log between them, it seems a bit strange to me because I think the overlord usually prints something unless it is stuck especially when you have debug logging enabled. If the overlord was processing other notices for 2 min before it sent pause requests, it would likely print something in the logs.
@harinirajendran interesting. Are any logs omitted between the second and the third lines (checkpointing and pause)? Or Is there no log between those two? If there is no log between them, it seems a bit strange to me because I think the overlord usually prints something unless it is stuck especially when you have debug logging enabled. If the overlord was processing other notices for 2 min before it sent pause requests, it would likely print something in the logs.
@jihoonson oh yeah. there are lots of log lines between the 2 timestamps. The coordinator continues to process other notices and spews out lots of logs. Here, I explicitly filtered out the log lines for this specific task which was paused for about 2 minutes.
I see. Thank you for confirming it. Your analysis seems correct to me. Now I'm curious what notices the supervisor was processing :slightly_smiling_face:
I see. Thank you for confirming it. Your analysis seems correct to me. Now I'm curious what notices the supervisor was processing 🙂
I am working on a couple small PRs to add more visibility into this. hopefully that helps us get to the bottom of this 🤞 https://github.com/apache/druid/pull/11415 https://github.com/apache/druid/pull/11417
We have a similar problem in our cluster. There's a topic with 12 partitions, and its peak incoming message rate is up to 120K/s. Once it reaches the peak message rate, Druid ingestion tasks seems stuck temporarily. I have not investigated the problem if it's related to this issue, but leave a comment here so that I could find this issue once I need it.
@harinirajendran I recommend you experiment running the overlord with these three PRs pulled to your code: #12096 #12097 #12099, and let us know how it goes.
I reviewed your analysis, and some of the code, and took a profile on our cluster here.
You are correct that during task rollover the overlord gets busy processing RunNotice notices. I can identify two codepaths where RunNotice hits the TaskQueue (in purple):
On our system, with above fixes, TaskQueue is only a fraction of the time

SeekableStreamSupervisor.RunNotice::handle -> SeekableStreamSupervisor::runInternal -> SeekableStreamSupervisor::createNewTasks -> SeekableStreamSupervisor::createTasksForGroup -> TaskQueue::addSeekableStreamSupervisor.RunNotice::handle -> SeekableStreamSupervisor::runInternal -> SeekableStreamSupervisor::checkPendingCompletionTasks -> SeekableStreamSupervisor::killTasksInGroup -> SeekableStreamSupervisor::killTask -> TaskQueue::shutdown
Both of these paths hit the lock in TaskQueue; the fixes I present above have improved scalability of TaskQueue on our system.
It might also help if you can share which metadata task storage engine you are using (SQL vs heap).
I see. Thank you for confirming it. Your analysis seems correct to me. Now I'm curious what notices the supervisor was processing 🙂
@jihoonson @jasonk000 @gianm : I have some more updates wrt this issue. The supervisor actually is spending a lot of time in processing runNotices which is causing the checkpointNotice to wait in notices queue for a long time causing tasks to be stuck which results in ingestion lag.
In our case, we have seen run notices take ~7s as shown in the graph below.
As a result of this, the notices queue gets backed up when the number of tasks are huge as each runNotice takes a long time to process.

On further analysis, we realized that the bulk of 7s in run_notice processing is being spent in the getAsyncStatus() call in discoverTasks function. When the task boots up, it roughly takes ~5s to start the JVM and start the HTTP server. So, as a result this Futures take about ~6s to get the status of tasks that are just bootstrapping with retries resulting in runNotice taking such a long time.
So, it's the tasks bootstrap time and hence its inability to respond to /status call from the supervisor that is causing run_notice to take ~6s causing notices queue to be backed up causing starvation of checkpoint_ notice causing ingestion lag. Does it make sense?
Have you seen something similar on your end? How long do Kafka real-time tasks take to bootstrap on your deployments? (Also, we use Middle Managers as of today instead of Indexers).
Having said this, @jasonk000 : I don't think the PRs you listed earlier (#12096 #12097 #12099) would solve the issue we are encountering, right?
I'm not sure whether the proposed PRs would fix your issue. I'd have to see stack traces during the 1.5-2mins pause time you refer, is that how you determined they were waiting on the future?
I'm not sure whether the proposed PRs would fix your issue. I'd have to see stack traces during the 1.5-2mins pause time you refer, is that how you determined they were waiting on the future?
I determined it is the task bootstrap time that's causing run_notice to take ~8s by reading through the run_notice handle code and also correlating the timestamps in the logs.
For example, here you can see that the kafkasupervisor_task_status but were stuck in a retry loop as the tasks were bootstrapping and hence wasn't accepting connections for 5-6s.
To prove this theory, I switched one of our lab environments to use indexers instead of MMs for real-time ingestion tasks and as I doubted, the run_notice handle time fell down from 8s to under 2s as there are no jvm restarts involved for every task bootstrap with indexers.
It seems like you're on the right path. If you can run jstack $pid during the period of concern, and capture what's happening to the thread, or even introduce some extra logging lines, this might narrow it down more quickly.
@harinirajendran I went looking for this, and I agree with your analysis. Stack traces showed that majority of wall clock time in the KafkaSupervisor thread was waiting on SQL queries executing as part of the RunNotice. I backported the changes in https://github.com/apache/druid/pull/12018 to our environment and they worked perfectly. A class histogram showed ~500 CheckpointNotice tasks sitting idle and ~2500 RunNotice tasks.
There are two ways you can confirm this is happening at any time you have slow checkpoint. Replace $pid and $supervisorname as appropriate.
- Look for a count of class instances that are
SeekableStreamSupervisor$RunNotice
jcmd $pid GC.class_histogram | grep SeekableStreamSupervisor
- Look for supervisor thread performing RunNotice calls
jstack $pid | grep -A60 KafkaSupervisor-$supervisorname\"
Thank you to @gianm, your solution was simple and worked perfectly.
majority of wall clock time in the KafkaSupervisor thread was waiting on SQL queries executing as part of the RunNotice
In our case, it wasn't the SQL queries that were causing RunNotice to take more time. But rather, the tasks bootstrap time(~5s). @gianm's fix certainly helped bring this down. But with time as task rollovers started getting spread out over a few minutes, there weren't many runNotices next to each other to coalesce, resulting in the same problem again. To bring the time for task bootstrap down, we switched from MMs to Indexers as it doesn't involve jvm restart for every task and that actually helped.
@jasonk000: Do you use MMs or indexers for your ingestion tasks? How long does it take for the ingestion tasks to bootstrap in your environment?
Our ingestion tasks run on MM nodes - I had a look, and it seems to take about 8-10 seconds to go from JVM start to reading Kafka.
Our ingestion tasks run on MM nodes - I had a look, and it seems to take about 8-10 seconds to go from JVM start to reading Kafka.
Great! In my case, I looked at the point when the httpServer was started by the task so that it can respond to /status calls from overlord.
The log line I searched was something like Started ServerConnector@6e475994{HTTP/1.1, (http/1.1)}{0.0.0.0:8102} which happens roughly at the 4th second after the tasks start. And runNotice at the overlord gets stuck here for that duration until task can respond to the /status call. This is my observation. Does it make sense?
We solved this problem by switching from Middle Managers to Indexers. The hourly spikes don't happen anymore. But, the underlying problem of run_notices taking 8-10 seconds because of task bootstrap should be fixed.
The log line I searched was something like
Started ServerConnector@6e475994{HTTP/1.1, (http/1.1)}{0.0.0.0:8102}which happens roughly at the 4th second after the tasks start. And runNotice at the overlord gets stuck here for that duration until task can respond to the/statuscall. This is my observation. Does it make sense?
@harinirajendran It does make sense. Out of curiosity -- what did you have chatThreads set to? I wonder if your number of tasks is much higher than your number of chat threads, and that caused the delays to compound.
The log line I searched was something like
Started ServerConnector@6e475994{HTTP/1.1, (http/1.1)}{0.0.0.0:8102}which happens roughly at the 4th second after the tasks start. And runNotice at the overlord gets stuck here for that duration until task can respond to the/statuscall. This is my observation. Does it make sense?@harinirajendran It does make sense. Out of curiosity -- what did you have chatThreads set to? I wonder if your number of tasks is much higher than your number of chat threads, and that caused the delays to compound.
@gianm : We just use the default value for chatThreads. The document says default == min(10, taskCount * replicas) . So, it should be 10 in our case.
Some recent work in this area of SeekableStreamSupervisor:
- Eliminate some unnecessary metadata calls: https://github.com/apache/druid/pull/13328
- Don't ever queue more than one RunNotice at a time: https://github.com/apache/druid/pull/13334
- Contact tasks asynchronously, eliminate
chatThreads, reduce load onworkerThreads: https://github.com/apache/druid/pull/13354
We are targeting Druid 25 for all of these. If anyone has a chance to try these patches out in advance of the release, please let us know!
@harinirajendran , can you confirm if this is still an issue with Druid 25?
FYI this is still an issue on Druid 27.0.0. I had set a task duration of 10 min (to reduce querying load on MMs peons) and you can see avg lag spikes matches the task roll period.
This is currently not a big deal because we have low traffic but when testing but higher volumes the spikes become a real burden because the lag spikes cannot be reduced within the 10 min period so the lag keeps increasing.
And like the OP, the Kafka tasks reports the highest avg pending time.
Let me pop up this issue to discuss. I think the PRs linked in this issue does not solve the problem from the root. And the last reply suggested that even after these PRs have been applied, the problem is still there.
If we are going to transit to K8S-based ingestion, I'm worried about the lag problem will be magnified because K8S scheduling introduces extra latency such as POD resource allocation(especially LocalPV), image pulling from remote repository. Based on my observations, the latency can be seconds. This means for K8S-based ingestion, tasks start slower than they do Middle Managers.
I'm wondering if you have any plans or any ideas to solve this problem, including the latency introduced by the K8S scheduling? @gianm @maytasm
I think there are actually two root causes of ingestion lag:
- Bottlenecks in the Overlord around operations like leader failover, task discovery, segment allocations. If the Overlord cannot execute these operations quickly then ingestion tasks will at some point get stalled. (For example: if a task gets data for a new time period, it cannot proceed until a segment is allocated.)
- When the Overlord rolls tasks, it first tells the old ones to stop ingesting, then it launches new ones. This means that there is always a stall of roughly task-launch-time whenever a roll happens.
A lot of work has been done on area (1), recently led by @kfaraz. In an earlier comment I listed some enhancements that were made in Druid 25. Since then, there has been more work, a few highlights including:
- Batch segment allocation enabled by default: https://github.com/apache/druid/pull/13942
- Reduced metadata store usage during segment allocation: https://github.com/apache/druid/pull/17496
- Segment metadata cache on the Overlord (ships in Druid 33 but not yet on by default): https://github.com/apache/druid/pull/17653
- Improve concurrency in TaskQueue (hasn't shipped yet): https://github.com/apache/druid/pull/17828
There are other PRs too, these are just the first few that come to mind.
As far as I know, there hasn't been work done on (2). In most of our production environments this is a source of small but noticeable lag. When we use the k8s launcher, it's generally 10–15s of lag each rollover period. The main reason that tasks are rolled like this is that the new tasks have their start offsets included in the task spec, so they can't be created until the old tasks stop reading. This could be fixed by doing something like:
- Remove start offsets from the task spec
- Add a
setStartOffsetsAPI call that the Overlord should call when it knows the start offsets - The Overlord should launch the new tasks early. To make this possible it needs to have a little warning of when the new tasks will be needed (maybe 1 minute). For
taskDurationbased rollover this is easy. FormaxRowsPerSegmentormaxTotalRowsbased rollover, the supervisor doesn't have visibility into the numbers, so it will need to retrieve them from tasks periodically in order to get proper warning.
One more thing to add to (2) from the above, the PR https://github.com/apache/druid/pull/14533 may also help with (2). This may help if the supervisor is configured with a lot of tasks (and/or stopTaskCount is large) or many supervisors roll over at the same time.
One more thing to add to (2) from the above, the PR #14533 may also help with (2). This may help if the supervisor is configured with a lot of tasks (and/or stopTaskCount is large) or many supervisors roll over at the same time.
@maytasm This looks promising. Is the property mentioned, avaialble in later versions of druid, in particular v30.0.0 ?
(we experience this issue during peak hours when more load occurs, and the lag spikes steeply, and hourly)
I would also try to follow @gianm 's comment 👍