KAFKA-13152: Replace "buffered.records.per.partition" & "cache.max.bytes.buffering" with "{statestore.cache}/{input.buffer}.max.bytes"
This PR continues the work that was mostly completed but was unreviewed in #13283. This is part of KIP-770 which is partially completed.
I copied and pasted most of the work from the previous PR and added some changes and also fixed the existing tests that were failing. These are the changes added in addition to the previous work:
1. Refactored TopologyConfig
The checkStyle was failing due to a high cyclomatic complexity in this file. I refactored some of the code to pass the checkStyle.
2. Added type check for StreamTask in TaskManager
There was a new task called ReadOnlyTask (https://github.com/apache/kafka/pull/13283#issuecomment-1813728802) added during the previous PR, which was not accounted for and all the Task were assumed to be StreamTask. So I added a check for the type safety before casting.
3. Added check for cache resizing
I would like to get more confirmation/guidance on what the expected behaviour should be for this
Before:
resizeThreadCacheAndBufferMemory(threads.size());
After:
// For cache resizing, check if thread removal succeeded or timed out
final boolean threadWasRemoved = !threads.contains(streamThread);
if (threadWasRemoved) {
// Thread was successfully removed so we can use current thread count
resizeThreadCacheAndBufferMemory(threads.size());
} else {
// Thread removal timed out so we need to resize as if the thread was removed
resizeThreadCacheAndBufferMemory(threads.size() - 1);
}
4. Check for CorruptedRecord
// Also we need to resume if the queue only contains corrupted records
// because they represent a processable queue which is actually empty
if (recordInfo.queue().isEmpty() ||
(maxBufferedSize != -1 && recordInfo.queue().size() == maxBufferedSize) ||
recordInfo.queue().headRecordIsCorrupted()) { // added this check
The check was added primarily due to a failing test
StreamTaskTest.shouldResumePartitionWhenSkippingOverRecordsWithInvalidTs()
There were invalid timestamps in a partition which made the record a CorruptedRecord
As per my understanding, we should resume the condition even if the head record is a CorruptedRecord because it is logically empty. The partition appears to be full but actually contains no meaningful records to process.
Please confirm if my understanding is correct.
5. Refactored as per below comment
As per - https://github.com/apache/kafka/pull/13283#issuecomment-1815619664
so how about we move that method to the Task interface and just return 0 for the implementation of it in ReadOnlyTask and StandbyTask?
A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.
A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.
@mjsax reminder for review :)
A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.
This PR is being marked as stale since it has not had any activity in 90 days. If you would like to keep this PR alive, please leave a comment asking for a review. If the PR has merge conflicts, update it with the latest from the base branch.
If you are having difficulty finding a reviewer, please reach out on the [mailing list](https://kafka.apache.org/contact).
If this PR is no longer valid or desired, please feel free to close it. If no activity occurs in the next 30 days, it will be automatically closed.
A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.
A label of 'needs-attention' was automatically added to this PR in order to raise the
attention of the committers. Once this issue has been triaged, the triage label
should be removed to prevent this automation from happening again.
@frankvicky, thanks for reviewing the PR. I addressed your comments! Also, sorry for all the additional tags, I messed up merging 😅