KAFKA-17747: Trigger rebalance on rack topology changes
In previous implementation, we kept partitionRacks in TopicMetadata. However, it took too much memory, so we removed it in https://github.com/apache/kafka/pull/17233.
We still need rack topology information to trigger rebalance, so we only store digest in TopicMetadata to detect changes. This PR also updates SubscribedTopicDescriberImpl#racksForPartition, so PartitionAssignor can get correct racks.
Committer Checklist (excluded from commit message)
- [ ] Verify design and implementation
- [ ] Verify test coverage and CI build status
- [ ] Verify documentation (including upgrade notes)
Hi @dajac, I implement option 2 in https://issues.apache.org/jira/browse/KAFKA-17747. Could you help me check whether current implementation is on a correct way? If yes, I will do following changes:
- Remove
TopicMetadata(Uuid, String, int)andSubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata>). Only keepTopicMetadata(Uuid, String, int, int)andSubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata>, Map<Uuid, Map<Integer, Set<String>>>). - Add a new field
PartitionRacksHashCodetoConsumerGroupPartitionMetadataValue.jsonandShareGroupPartitionMetadataValue.json. - Add related test cases.
Thank you 😄
Hi @FrankYang0529. Thanks for working on this. I have a few high level comments/questions to start with.
- I am not convinced that using
hashCode()is the right approach. Are we sure that we won't have collision with it? Another thing is that thehashCodeimplementation may differ between versions of the JVM. An alternative would be use to MurmurHash for instance. What do you think? - With this approach, I was hoping that we could actually get rid of the
Map<String, TopicMetadata>data structure and have a hash which replaces all of it. The idea would be to rely on the MetadataImage instead and to have a hash to detect when the metadata that we rely one has changed in the image. Have you considered something like this?
Regarding option 1) in https://issues.apache.org/jira/browse/KAFKA-17747, have you discarded it?
- An alternative would be use to MurmurHash for instance. What do you think?
Agree, hashCode() is not good enough for it. We should use something like md5. Also, the hash function should generate same result for same data with different order, because different order doesn't impact target assignment result.
- I was hoping that we could actually get rid of the
Map<String, TopicMetadata>data structure and have a hash which replaces all of it.
Do you mean that we just want to have a hash value in ConsumerGroupPartitionMetadataValue, not a list of TopicMetadata?
Regarding option 1) in https://issues.apache.org/jira/browse/KAFKA-17747, have you discarded it?
Since you mention controller in option 1, I'm not quite sure whether we want to store epoch in TopicImage or TopicMetadata.
For a new epoch in TopicImage, we may need to store rack information to it as well, or the epoch can't represent all changes (e.g. rack change). However, the TopicImage is not only used in group coordinator, it's also used in KRaft. It's may not be a good idea to couple it.
For a new epoch in TopicMetadata, we still need to calculate subscription metadata when MetadataImage is updated. To save storage resources, we don't want to store duplicated information. Eventually, the way may not be more efficient than option 2.
Agree, hashCode() is not good enough for it. We should use something like md5. Also, the hash function should generate same result for same data with different order, because different order doesn't impact target assignment result.
MurmurHash would be better in my opinion and I think that we already use it in the code base. Regarding the order, we may have to sort the racks in order to be consistent.
Do you mean that we just want to have a hash value in ConsumerGroupPartitionMetadataValue, not a list of TopicMetadata?
Exactly. I think that we could even thinking about removing that record and putting the hash into ConsumerGroupMetadataValue or somewhere else. Would it be possible? One advantage of removing the data structure entirely is that we could take into account more things in the future without changing the format of the record.
Since you mention controller in option 1, I'm not quite sure whether we want to store epoch in TopicImage or TopicMetadata. For a new epoch in TopicImage, we may need to store rack information to it as well, or the epoch can't represent all changes (e.g. rack change). However, the TopicImage is not only used in group coordinator, it's also used in KRaft. It's may not be a good idea to couple it. For a new epoch in TopicMetadata, we still need to calculate subscription metadata when MetadataImage is updated. To save storage resources, we don't want to store duplicated information. Eventually, the way may not be more efficient than option 2.
If you update the epoch based on replica changes, it would also catch the rack changes because you can only change the rack when you restart the broker. However, I agree that the coupling is not ideal. This is actually what I pointed out in the Jira.
Hi @dajac, thanks for your guide. I will do my best to implement option 2. I will also think about combine ConsumerGroupPartitionMetadataValue to ConsumerGroupMetadataValue. 👍
Hi @dajac, the PR is ready. I did following change:
- Move
Murmur3fromorg.apache.kafka.streams.state.internalstoorg.apache.kafka.common.hash. - In
TopicMetadata, useTreeMapandTreeSetto storepartitionRacks. - Add
hashSubscriptionMetadatatoUtils. In it, useTreeMapto make sure theMap<String, TopicMetadata>is sorted. - Remove
ConsumerGroupPartitionMetadataValueandShareGroupPartitionMetadataValue, so we don't store subscription metadata in__consumer_offsets. ~Remove relatedGroupMetadataManager#replayfunctions as well.~ - Add hash of subscription metadata to
ConsumerGroupMetadataValue. - Add
TimelineLong subscribedTopicMetadataHashtoModernGroup. - Remove
TimelineHashMap<String, TopicMetadata> subscribedTopicMetadatafromModernGroup.
The CI result looks good. Could you help me review again when you have time? Thank you.
@FrankYang0529 @chia7712 We may need a KIP for this one. Let's first agree on the ideal solution and we can do a small KIP if needed.
Hi @dajac, I know the feature freeze date has already passed. However, do we have chance to push this to 4.0, so group coordinator can detect rack topology changes? The PR is ready and the KIP voting is in progress. Thanks.
KIP-1101: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes Vote thread: https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p
Hi @dajac, if there is no other discussion about KIP-1101, I would like to start the implementation of it. Since this PR is too large, I will split it to followings:
- Add new field to ConsumerGroupMetadataValue, ShareGroupMetadataValue, and StreamsGroupMetadataValue.
- Add Guava to dependencies and topic hash function.
- Introduce topic hash cache in GroupMetadataManager.
- Update TargetAssignmentBuilder and SubscribedTopicDescriberImpl to use subscribed topic id set and metadata image.
- Replace topic metadata with topic hash in GroupMetadataManager.
@FrankYang0529 When you update the PR, I wonder whether we could split it into smaller chunks. That would ease the reviews. Would it be possible?
When you update the PR, I wonder whether we could split it into smaller chunks. That would ease the reviews. Would it be possible?
Yes, I will close this PR and create a list of new PRs. Thanks.