kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-15859: Make RemoteListOffsets call an async operation

Open kamalcph opened this issue 1 year ago • 6 comments

This is the part-2 of the KIP-1075

To find the offset for a given timestamp, ListOffsets API is used by the client. When the topic is enabled with remote storage, then we have to fetch the remote indexes such as offset-index and time-index to serve the query. Also, the ListOffsets request can contain the query for multiple topics/partitions.

The time taken to read the indexes from remote storage is non-deterministic and the query is handled by the request-handler threads. If there are multiple LIST_OFFSETS queries and most of the request-handler threads are busy in reading the data from remote storage, then the other high-priority requests such as FETCH and PRODUCE might starve and be queued. This can lead to higher latency in producing/consuming messages.

In this patch, we have introduced a delayed operation for remote list-offsets call. If the timestamp need to be searched in the remote-storage, then the request-handler threads will pass-on the request to the remote-log-reader threads. And, the request gets handled in asynchronous fashion.

Covered the patch with unit and integration tests.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

kamalcph avatar Jul 16 '24 14:07 kamalcph

@chia7712 @showuon @satishd

Call for review. PTAL.

kamalcph avatar Jul 16 '24 14:07 kamalcph

Test failures are unrelated.

kamalcph avatar Jul 17 '24 03:07 kamalcph

@kamalcph , I'd like to make sure this PR can be reviewed before KIP-1075 get approved. Is that right?

showuon avatar Aug 02 '24 09:08 showuon

@kamalcph , I'd like to make sure this PR can be reviewed before KIP-1075 get approved. Is that right?

yes, this PR can be reviewed. There are no public API changes made in this PR. To define the timeout for delayed remote list offsets operation, reused the server request timeout since the tiered storage is not production ready. If it is not acceptable, then we may have to wait for the KIP-1075 approval.

kamalcph avatar Aug 02 '24 09:08 kamalcph

Well, I'll review KIP-1075 first then.

showuon avatar Aug 02 '24 09:08 showuon

@showuon @clolov

Is it possible to give this PR an early review to keep it in good shape? Thanks!

kamalcph avatar Aug 03 '24 13:08 kamalcph

@kamalcph please fix conflicts, thanks :)

chia7712 avatar Sep 03 '24 15:09 chia7712

@showuon @chia7712 @satishd @clolov

The diff is ready for review. PTAL. Thanks!

kamalcph avatar Sep 03 '24 18:09 kamalcph

@showuon Addressed your review comments. PTAL. Thanks!

kamalcph avatar Sep 10 '24 17:09 kamalcph

Gentle ping for review.

kamalcph avatar Sep 13 '24 06:09 kamalcph

@kamalcph Could you please fix the conflicts?

chia7712 avatar Sep 13 '24 08:09 chia7712

JDK11 test failures are unrelated to this PR, the tests were timed out.

Found 2 flaky test failures:
FLAKY ⚠️  MetricsDuringTopicCreationDeletionTest > "testMetricsDuringTopicCreateDelete(String).quorum=zk"
FLAKY ⚠️  OffsetsApiIntegrationTest > testAlterSinkConnectorOffsetsDifferentKafkaClusterTargeted()
Read env GRADLE_EXIT_CODE: 124
Read env THREAD_DUMP_URL: https://github.com/apache/kafka/actions/runs/10854248301/artifacts/1932678337
Gradle command timed out. These are partial results!
25602 tests cases run in 6h14m4s. 23379 PASSED ✅, 0 FAILED ❌, 2 FLAKY ⚠️ , 18 SKIPPED 🙈, and 0 errors.
Failing this step because the tests timed out. Thread dumps were taken and archived here: https://github.com/apache/kafka/actions/runs/10854248301/artifacts/1932678337
Error: Process completed with exit code 1.

kamalcph avatar Sep 15 '24 17:09 kamalcph

@kamalcph thanks for checking the failed tests. They pass on my local. will merge this PR

./gradlew cleanTest :tools:test --tests MetadataQuorumCommandTest.testDescribeQuorumReplicationSuccessful :core:test --tests ListOffsetsIntegrationTest.testThreeNonCompressedRecordsInSeparateBatch --tests ConsumerBounceTest.testConsumptionWithBrokerFailures

chia7712 avatar Sep 15 '24 17:09 chia7712

@kamalcph any updates? or just trigger QA again?

chia7712 avatar Sep 15 '24 17:09 chia7712

I rebased the branch against trunk to retrigger the tests again.

kamalcph avatar Sep 15 '24 17:09 kamalcph

I rebased the branch against trunk to retrigger the tests again.

Got it

chia7712 avatar Sep 15 '24 17:09 chia7712

Thank you all for the reviews!

kamalcph avatar Sep 16 '24 03:09 kamalcph

@kamalcph, the test introduced in this PR is flaky on trunk https://ge.apache.org/scans/tests?search.names=CI%20workflow&search.rootProjectNames=kafka&search.tags=github%2Ctrunk&search.tasks=test&search.timeZoneId=America%2FNew_York&search.values=CI&tests.container=kafka.log.remote.RemoteLogOffsetReaderTest&tests.sortField=FLAKY

Can you take a look? I have filed KAFKA-17559 in the mean time. Thanks!

mumrah avatar Sep 16 '24 18:09 mumrah

@mumrah Sorry for that flaky. I will take a look!

chia7712 avatar Sep 16 '24 23:09 chia7712

Opened #17214 to fix the flaky test. PTAL.

kamalcph avatar Sep 17 '24 04:09 kamalcph

Thanks @junrao for the review comments! I will follow-up on them.

kamalcph avatar Oct 11 '24 09:10 kamalcph