kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-18952: Fix flaky test in MonitorableSinkIntegrationTest

Open majialoong opened this issue 1 month ago • 1 comments

The test testMonitorableSinkConnectorAndTask is flaky due to a race condition between the task thread and the test thread.

The awaitRecords() method uses a CountDownLatch that counts down in TestableSinkTask.put() for each record inside the loop, while MonitorableSinkTask.count is updated after super.put() returns. When the latch reaches zero, awaitRecords() returns immediately, but the count += records.size() may not have executed yet.

majialoong avatar Dec 02 '25 16:12 majialoong

I added logging to verify the race condition.

In MonitorableSinkTask.put():

@Override
public void put(Collection<SinkRecord> records) {
    super.put(records);
    count += records.size();
    // add log
    System.out.println("MonitorableSinkTask: count updated to: " + count);
}

In MonitorableSinkIntegrationTest (before fix):

// add log
log.info("Task metric value: {}", metrics.get(taskMetric).metricValue());
assertEquals((double) NUM_RECORDS_PRODUCED, metrics.get(taskMetric).metricValue());

Successful run: image

Failed run: image

The logs clearly show the race condition:

  1. In the failed run, awaitRecords() returns (latch countdown completed) before the last count += records.size() executes.
  2. The test thread reads the metric value 999.0 while the task thread has not yet updated count to 1000.
  3. The count updated to: 1000 log appears after the test reads the metric, confirming the race.

majialoong avatar Dec 02 '25 16:12 majialoong