kafka icon indicating copy to clipboard operation
kafka copied to clipboard

KAFKA-17747: Trigger rebalance on rack topology changes

Open FrankYang0529 opened this issue 1 year ago • 6 comments

In previous implementation, we kept partitionRacks in TopicMetadata. However, it took too much memory, so we removed it in https://github.com/apache/kafka/pull/17233.

We still need rack topology information to trigger rebalance, so we only store digest in TopicMetadata to detect changes. This PR also updates SubscribedTopicDescriberImpl#racksForPartition, so PartitionAssignor can get correct racks.

Committer Checklist (excluded from commit message)

  • [ ] Verify design and implementation
  • [ ] Verify test coverage and CI build status
  • [ ] Verify documentation (including upgrade notes)

FrankYang0529 avatar Oct 10 '24 09:10 FrankYang0529

Hi @dajac, I implement option 2 in https://issues.apache.org/jira/browse/KAFKA-17747. Could you help me check whether current implementation is on a correct way? If yes, I will do following changes:

  • Remove TopicMetadata(Uuid, String, int) and SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata>). Only keep TopicMetadata(Uuid, String, int, int) and SubscribedTopicDescriberImpl(Map<Uuid, TopicMetadata>, Map<Uuid, Map<Integer, Set<String>>>).
  • Add a new field PartitionRacksHashCode to ConsumerGroupPartitionMetadataValue.json and ShareGroupPartitionMetadataValue.json.
  • Add related test cases.

Thank you 😄

FrankYang0529 avatar Oct 10 '24 09:10 FrankYang0529

Hi @FrankYang0529. Thanks for working on this. I have a few high level comments/questions to start with.

  1. I am not convinced that using hashCode() is the right approach. Are we sure that we won't have collision with it? Another thing is that the hashCode implementation may differ between versions of the JVM. An alternative would be use to MurmurHash for instance. What do you think?
  2. With this approach, I was hoping that we could actually get rid of the Map<String, TopicMetadata> data structure and have a hash which replaces all of it. The idea would be to rely on the MetadataImage instead and to have a hash to detect when the metadata that we rely one has changed in the image. Have you considered something like this?

Regarding option 1) in https://issues.apache.org/jira/browse/KAFKA-17747, have you discarded it?

dajac avatar Oct 11 '24 07:10 dajac

  1. An alternative would be use to MurmurHash for instance. What do you think?

Agree, hashCode() is not good enough for it. We should use something like md5. Also, the hash function should generate same result for same data with different order, because different order doesn't impact target assignment result.

  1. I was hoping that we could actually get rid of the Map<String, TopicMetadata> data structure and have a hash which replaces all of it.

Do you mean that we just want to have a hash value in ConsumerGroupPartitionMetadataValue, not a list of TopicMetadata?

Regarding option 1) in https://issues.apache.org/jira/browse/KAFKA-17747, have you discarded it?

Since you mention controller in option 1, I'm not quite sure whether we want to store epoch in TopicImage or TopicMetadata.

For a new epoch in TopicImage, we may need to store rack information to it as well, or the epoch can't represent all changes (e.g. rack change). However, the TopicImage is not only used in group coordinator, it's also used in KRaft. It's may not be a good idea to couple it.

For a new epoch in TopicMetadata, we still need to calculate subscription metadata when MetadataImage is updated. To save storage resources, we don't want to store duplicated information. Eventually, the way may not be more efficient than option 2.

FrankYang0529 avatar Oct 14 '24 09:10 FrankYang0529

Agree, hashCode() is not good enough for it. We should use something like md5. Also, the hash function should generate same result for same data with different order, because different order doesn't impact target assignment result.

MurmurHash would be better in my opinion and I think that we already use it in the code base. Regarding the order, we may have to sort the racks in order to be consistent.

Do you mean that we just want to have a hash value in ConsumerGroupPartitionMetadataValue, not a list of TopicMetadata?

Exactly. I think that we could even thinking about removing that record and putting the hash into ConsumerGroupMetadataValue or somewhere else. Would it be possible? One advantage of removing the data structure entirely is that we could take into account more things in the future without changing the format of the record.

Since you mention controller in option 1, I'm not quite sure whether we want to store epoch in TopicImage or TopicMetadata. For a new epoch in TopicImage, we may need to store rack information to it as well, or the epoch can't represent all changes (e.g. rack change). However, the TopicImage is not only used in group coordinator, it's also used in KRaft. It's may not be a good idea to couple it. For a new epoch in TopicMetadata, we still need to calculate subscription metadata when MetadataImage is updated. To save storage resources, we don't want to store duplicated information. Eventually, the way may not be more efficient than option 2.

If you update the epoch based on replica changes, it would also catch the rack changes because you can only change the rack when you restart the broker. However, I agree that the coupling is not ideal. This is actually what I pointed out in the Jira.

dajac avatar Oct 14 '24 13:10 dajac

Hi @dajac, thanks for your guide. I will do my best to implement option 2. I will also think about combine ConsumerGroupPartitionMetadataValue to ConsumerGroupMetadataValue. 👍

FrankYang0529 avatar Oct 14 '24 13:10 FrankYang0529

Hi @dajac, the PR is ready. I did following change:

  • Move Murmur3 from org.apache.kafka.streams.state.internals to org.apache.kafka.common.hash.
  • In TopicMetadata, use TreeMap and TreeSet to store partitionRacks.
  • Add hashSubscriptionMetadata to Utils. In it, use TreeMap to make sure theMap<String, TopicMetadata> is sorted.
  • Remove ConsumerGroupPartitionMetadataValue and ShareGroupPartitionMetadataValue, so we don't store subscription metadata in __consumer_offsets. ~Remove related GroupMetadataManager#replay functions as well.~
  • Add hash of subscription metadata to ConsumerGroupMetadataValue.
  • Add TimelineLong subscribedTopicMetadataHash to ModernGroup.
  • Remove TimelineHashMap<String, TopicMetadata> subscribedTopicMetadata from ModernGroup.

The CI result looks good. Could you help me review again when you have time? Thank you.

FrankYang0529 avatar Oct 20 '24 07:10 FrankYang0529

@FrankYang0529 @chia7712 We may need a KIP for this one. Let's first agree on the ideal solution and we can do a small KIP if needed.

dajac avatar Oct 24 '24 16:10 dajac

Hi @dajac, I know the feature freeze date has already passed. However, do we have chance to push this to 4.0, so group coordinator can detect rack topology changes? The PR is ready and the KIP voting is in progress. Thanks.

KIP-1101: https://cwiki.apache.org/confluence/display/KAFKA/KIP-1101%3A+Trigger+rebalance+on+rack+topology+changes Vote thread: https://lists.apache.org/thread/j90htmphjk783p697jjfg1xt8thmy33p

FrankYang0529 avatar Dec 16 '24 11:12 FrankYang0529

Hi @dajac, if there is no other discussion about KIP-1101, I would like to start the implementation of it. Since this PR is too large, I will split it to followings:

  • Add new field to ConsumerGroupMetadataValue, ShareGroupMetadataValue, and StreamsGroupMetadataValue.
  • Add Guava to dependencies and topic hash function.
  • Introduce topic hash cache in GroupMetadataManager.
  • Update TargetAssignmentBuilder and SubscribedTopicDescriberImpl to use subscribed topic id set and metadata image.
  • Replace topic metadata with topic hash in GroupMetadataManager.

FrankYang0529 avatar Mar 19 '25 12:03 FrankYang0529

@FrankYang0529 When you update the PR, I wonder whether we could split it into smaller chunks. That would ease the reviews. Would it be possible?

dajac avatar Apr 16 '25 17:04 dajac

When you update the PR, I wonder whether we could split it into smaller chunks. That would ease the reviews. Would it be possible?

Yes, I will close this PR and create a list of new PRs. Thanks.

FrankYang0529 avatar Apr 17 '25 07:04 FrankYang0529