kafka
kafka copied to clipboard
KAFKA-18952: Fix flaky test in MonitorableSinkIntegrationTest
The test testMonitorableSinkConnectorAndTask is flaky due to a race
condition between the task thread and the test thread.
The awaitRecords() method uses a CountDownLatch that counts down in
TestableSinkTask.put() for each record inside the loop, while
MonitorableSinkTask.count is updated after super.put() returns. When
the latch reaches zero, awaitRecords() returns immediately, but the
count += records.size() may not have executed yet.
I added logging to verify the race condition.
In MonitorableSinkTask.put():
@Override
public void put(Collection<SinkRecord> records) {
super.put(records);
count += records.size();
// add log
System.out.println("MonitorableSinkTask: count updated to: " + count);
}
In MonitorableSinkIntegrationTest (before fix):
// add log
log.info("Task metric value: {}", metrics.get(taskMetric).metricValue());
assertEquals((double) NUM_RECORDS_PRODUCED, metrics.get(taskMetric).metricValue());
Successful run:
Failed run:
The logs clearly show the race condition:
- In the failed run,
awaitRecords()returns (latch countdown completed) before the lastcount += records.size()executes. - The test thread reads the metric value 999.0 while the task thread has not yet updated count to 1000.
- The
count updated to: 1000log appears after the test reads the metric, confirming the race.