kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-13152: Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"

Open shashankhs11 opened this issue 5 months ago • 7 comments

This PR continues the work that was mostly completed but was unreviewed in #13283. This is part of KIP-770 which is partially completed.

I copied and pasted most of the work from the previous PR and added some changes and also fixed the existing tests that were failing. These are the changes added in addition to the previous work:

1. Refactored TopologyConfig

The checkStyle was failing due to a high cyclomatic complexity in this file. I refactored some of the code to pass the checkStyle.

2. Added type check for StreamTask in TaskManager

There was a new task called ReadOnlyTask (https://github.com/apache/kafka/pull/13283#issuecomment-1813728802) added during the previous PR, which was not accounted for and all the Task were assumed to be StreamTask. So I added a check for the type safety before casting.

3. Added check for cache resizing

I would like to get more confirmation/guidance on what the expected behaviour should be for this

Before:

resizeThreadCacheAndBufferMemory(threads.size());

After:

// For cache resizing, check if thread removal succeeded or timed out
final boolean threadWasRemoved = !threads.contains(streamThread);
if (threadWasRemoved) {
    // Thread was successfully removed so we can use current thread count
    resizeThreadCacheAndBufferMemory(threads.size());
} else {
    // Thread removal timed out so we need to resize as if the thread was removed
    resizeThreadCacheAndBufferMemory(threads.size() - 1);
}

4. Check for CorruptedRecord

// Also we need to resume if the queue only contains corrupted records
// because they represent a processable queue which is actually empty
if (recordInfo.queue().isEmpty() ||
    (maxBufferedSize != -1 && recordInfo.queue().size() == maxBufferedSize) ||
    recordInfo.queue().headRecordIsCorrupted()) { // added this check

The check was added primarily due to a failing test

StreamTaskTest.shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs()

There were invalid timestamps in a partition which made the record a CorruptedRecord As per my understanding, we should resume the condition even if the head record is a CorruptedRecord because it is logically empty. The partition appears to be full but actually contains no meaningful records to process.

Please confirm if my understanding is correct.

5. Refactored as per below comment

As per - https://github.com/apache/kafka/pull/13283#issuecomment-1815619664

so how about we move that method to the Task interface and just return 0 for the implementation of it in ReadOnlyTask and StandbyTask?

shashankhs11 avatar Aug 02 '25 00:08 shashankhs11

A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the triage label should be removed to prevent this automation from happening again.

github-actions[bot] avatar Aug 09 '25 03:08 github-actions[bot]

A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the triage label should be removed to prevent this automation from happening again.

github-actions[bot] avatar Sep 04 '25 03:09 github-actions[bot]

@mjsax reminder for review :)

shashankhs11 avatar Sep 06 '25 05:09 shashankhs11

A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the triage label should be removed to prevent this automation from happening again.

github-actions[bot] avatar Sep 08 '25 03:09 github-actions[bot]

This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.

If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).

If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.

github-actions[bot] avatar Dec 07 '25 03:12 github-actions[bot]

A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the triage label should be removed to prevent this automation from happening again.

github-actions[bot] avatar Dec 09 '25 03:12 github-actions[bot]

A label of 'needs-attention' was automatically added to this PR in order to raise the attention of the committers. Once this issue has been triaged, the triage label should be removed to prevent this automation from happening again.

github-actions[bot] avatar Dec 11 '25 03:12 github-actions[bot]

@frankvicky, thanks for reviewing the PR. I addressed your comments! Also, sorry for all the additional tags, I messed up merging 😅

shashankhs11 avatar Dec 18 '25 20:12 shashankhs11