druid icon indicating copy to clipboard operation
druid copied to clipboard

Fix negative Kafka partition lag caused by inconsistent current/latest offsets

Open wuguowei1994 opened this issue 1 month ago • 15 comments

Motivation

We operate a Druid deployment with more than 500 nodes.

In real-time ingestion scenarios, a monitoring process queries the cluster every minute to retrieve the ingest/kafka/partitionLag metric. If the lag remains unhealthy for more than five minutes, alerts are triggered.

In our production environment, this metric periodically becomes negative, even when the cluster is fully healthy. These false alerts create unnecessary operational load and frequently wake the on-call team during off-hours. At the same time, we cannot suppress negative-lag alerts entirely, since in some situations negative lag can indicate real ingestion problems.

For a large-scale, 24×7 real-time ingestion pipeline, accurate and consistent lag metrics are essential to avoid unnecessary nighttime wake-ups while still ensuring that real issues are detected promptly.


Problem Description

negative_lag

In the current implementation, the Druid supervisor maintains two volatile data structures:

  • The latest Kafka end_offset for each partition
  • The latest task-reported current_offset for each partition

The supervisor periodically updates these values (every 30 seconds):

  1. Querying all tasks in parallel to update current_offset. This step waits for all HTTP requests to complete and each request has a timeout of two minutes.
  2. Querying Kafka cluster to refresh end_offset.

On the other hand, a separate periodic task (every minute) computes:

lag = end_offset - current_offset

Because the two updates are not atomic, intermediate inconsistent states may occur.

Intermediate State Leading to Negative Lag

If one task becomes heavily loaded or experiences other delays during Step 1, it may take significantly longer to return its offset. In this situation, the supervisor continues waiting for that slow task while the other tasks have already responded.

During this waiting period:

  • Many current_offset values already have been updated to new values.
  • The end_offset values remain stale because Step 2 has not executed yet.

If a monitoring request arrives in this intermediate window, the supervisor computes lag using:

  • Partially updated current_offset
  • Stale end_offset

This produces negative lag values.

This issue repeats as long as at least one task remains slow. Large clusters with many partitions and many Kafka-indexing tasks are more likely to experience this scenario.


Example Scenario

  1. Initial state: end_offset = 10000, current_offset = 0.

  2. After consumption: latest Kafka end_offset = 30000, and all tasks have consumed up to 20000.

  3. During Step 1, 49 tasks respond quickly, and their current_offset is updated to 20000. One task is slow, causing Step 1 to remain in the awaiting state.

  4. The in-memory end_offset stays at the old value 10000.

  5. If a metric query occurs at this point, the supervisor calculates:

    10000 - 20000 = -10000
    
  6. Because the periodic update logic repeats, this situation can persist across multiple cycles.


Proposed Changes

Replace the two volatile structures storing current_offset and end_offset with AtomicReference containers that hold both values as a single immutable state object. The supervisor will update these references as atomic units, ensuring that lag computation always observes a consistent snapshot.

This eliminates inconsistent intermediate states and prevents negative lag due to partial updates.


Rationale

  • Ensures consistent reads between related fields.
  • No behavioral changes other than removing negative lag caused by inconsistent state.

Operational Impact

  • Improved accuracy of Kafka lag metrics in large clusters.
  • Reduces false alerts in monitoring systems.

Test Plan

  • This change does not add new feature. We only need to make sure existing tests still pass.
  • All current tests pass successfully.

This PR has:

  • [x] been self-reviewed.
  • [ ] added documentation for new or modified features or behaviors.
  • [ ] a release note entry in the PR description.
  • [ ] added Javadocs for most classes and all non-trivial methods. Linked related entities via Javadoc links.
  • [ ] added or updated version, license, or notice information in licenses.yaml
  • [ ] added comments explaining the "why" and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • [x] added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage is met.
  • [ ] added integration tests.
  • [x] been tested in a test Druid cluster.

wuguowei1994 avatar Nov 18 '25 06:11 wuguowei1994

Our team has been suffering from these negative-lag alerts recently, and they’ve been repeatedly waking us up at night. It’s become difficult for us to get a good night’s sleep.

wuguowei1994 avatar Nov 19 '25 02:11 wuguowei1994

@wuguowei1994 , thanks for creating a PR in Apache Druid! One of the contributors will take a look at the changes soon.

kfaraz avatar Nov 19 '25 03:11 kfaraz

@wuguowei1994 could you sync the latest master to your branch? I think this patch might fix the test failures.

abhishekrb19 avatar Nov 20 '25 03:11 abhishekrb19

ERROR [main] org.apache.druid.testing.utils.DruidClusterAdminClient 

- Error while waiting for [http://localhost:30400] to be ready


java.util.concurrent.ExecutionException: org.jboss.netty.channel.ChannelException: Faulty channel in resource pool

@clintropolis Thanks for rerunning the jobs. Before the rerun there were two failed tasks, and now there’s only one left. The error still looks like an environment issue… really frustrating.

This whole code submission experience also shows how inactive the project has become. A lot of people report issues in the Druid Slack, but almost no one responds anymore. It’s no surprise so many companies are migrating from Druid to ClickHouse.

wuguowei1994 avatar Nov 26 '25 01:11 wuguowei1994

@clintropolis Thanks for rerunning the jobs. Before the rerun there were two failed tasks, and now there’s only one left. The error still looks like an environment issue… really frustrating.

Yea, we are in the middle of a bit of a migration/overhaul of our integration test framework and processes, hopefully this will be more well behaved in the future, since part of the reason for this change is to address flakiness as well as make it much easier to write and debug these tests.

This whole code submission experience also shows how inactive the project has become. A lot of people report issues in the Druid Slack, but almost no one responds anymore. It’s no surprise so many companies are migrating from Druid to ClickHouse.

Apologies for the perception - while I can assure you that there are quite a lot of active and interesting projects happening and that Druid is still very much being actively developed, that doesn't fix your experience here or the optics in slack. The unfortunate matter is that there are always a lot more things to do than people to do them, and while we try our best, sometimes things are a bit slow to get a committers attention. All that said, thanks again for the contribution and trying to make things better, and I do hear you - i'll see if maybe I can nudge some of the other committers to be a bit more responsive and try to do the same myself.

clintropolis avatar Nov 26 '25 04:11 clintropolis

@clintropolis Take a look ?

wuguowei1994 avatar Dec 03 '25 08:12 wuguowei1994

@wuguowei1994 , thanks for your patience! We will try to get this PR reviewed soon.

kfaraz avatar Dec 03 '25 09:12 kfaraz

@cecemei I've gone through your comments thoroughly — they're great! Give me a bit of time to improve the code.

wuguowei1994 avatar Dec 04 '25 12:12 wuguowei1994

Thanks for the changes, @wuguowei1994 ! I have left some suggestions.

While the changes in this PR make sense by reporting the lag more consistently (updating the two offsets in lockstep), I was wondering if it wouldn't be simpler to just report zero lag in case the lag turns out to be negative.

A negative record lag does mean that the task has already caught up to the last offsets that we had fetched from the topic and ingested some more records beyond that. And the lag metric is really just meant to indicate if the tasks are keeping up.

For other purposes, we have the message gap and the time lag metrics.

In fact, the negative lag could even be a feature to identify if some tasks are particularly slow in returning their offsets. 😛 , and we could probably have alerts set up if the negative lag goes below a specific threshold.

@cecemei , I think you also raised a concern regarding the possibility that the lag reported might now be higher than the actual lag, since we always fetch the stream offsets only after we have received updates from all the tasks.

I think the current code is also susceptible to reporting stale lag (higher or lower).

Say if a task were slow to return its latest ingested offsets, we would be delayed in fetching the latest offsets from the stream. So, in that period, we would be reporting stale lag (which could have been higher or lower than the actual lag, a special case of which would be negative lag) and then as soon as we fetched the latest offsets from the stream, the reported lag would fix itself.

Yes I suspect we might seeing a slightly higher lag after this change (comparing with what might be reported before). The accuracy of the lag would not be affected by when the latestSequenceFromStream gets updated (more random), but would only be affected by how long it took fetching the offsets (delay inside the system and interaction with kafka). I do think it provides more consistency than before and the trend of the lag is more important, so it's an improvement.

For future reference, we could maybe calculate the lag on a per partition basis.

cecemei avatar Dec 04 '25 18:12 cecemei

In fact, the negative lag could even be a feature to identify if some tasks are particularly slow in returning their offsets. 😛 , and we could probably have alerts set up if the negative lag goes below a specific threshold.

@kfaraz Thanks for the clarification — that makes sense. In our case, though, we’ve noticed that negative lag in our large cluster can sometimes persist for over five minutes.

We’ve talked about this internally, and if it only happens occasionally (for example, under a minute), adjusting the alert thresholds would absolutely work for us. But when it lasts longer, it tends to indicate something worth investigating.

We’ve also seen a few situations where negative lag actually pointed to issues in the upstream Kafka cluster, so that’s part of why we’re a bit cautious here. If we keep the current Druid behavior and treat negative lag as normal consumption, there’s a chance we might overlook real problems.

So overall, having clear and reliable metrics to signal the health of the cluster would be really helpful for us.

wuguowei1994 avatar Dec 05 '25 03:12 wuguowei1994

@kfaraz @cecemei Both of your suggestions for improving the code are excellent, and I’m genuinely happy to keep refining it. Give me a bit of time to rework the design. Thank you both!

wuguowei1994 avatar Dec 05 '25 03:12 wuguowei1994

  1. Extract OffsetSnapshot into a separate class.
  2. Remove the class-level variable latestSequenceFromStream from KafkaSupervisor.
  3. Consolidate all updates and queries to the offsetSnapshotRef variable into dedicated methods; other internal functions will no longer access the variable directly.
  4. Add Javadoc documentation for the OffsetSnapshot class.
  5. Incorporate other suggestions from the team.

wuguowei1994 avatar Dec 09 '25 10:12 wuguowei1994

@kfaraz @cecemei Hey, could you check out the code? Suggestions welcome! 😄

wuguowei1994 avatar Dec 09 '25 10:12 wuguowei1994

@cecemei Sorry, I’ve been dealing with some personal matters recently, so my reply is a bit late. I’ve gone through all your reviews— they’re excellent, thank you! It was also a great learning opportunity for me.

Take a look again?

wuguowei1994 avatar Dec 15 '25 09:12 wuguowei1994

@cecemei Fixed the issue with UT and merged the latest changes from master. Thank you very much for your support throughout the entire process!

wuguowei1994 avatar Dec 16 '25 05:12 wuguowei1994

@kfaraz @clintropolis Take a look, please?

wuguowei1994 avatar Dec 18 '25 11:12 wuguowei1994