kafka-go
kafka-go copied to clipboard
Make fetchOffsets response parse conform to kafka protocol
Here is the definition of offsets fetch request: https://cwiki.apache.org/confluence/display/KAFKA/A+Guide+To+The+Kafka+Protocol#AGuideToTheKafkaProtocol-OffsetFetchRequest
The response is a list of topic partition offset information. We used a map to store topic and the partition offset info and in every loop we create a new map to overwrite the topic. Which means we only allow the topic appear once in the list. But the protocol definition has no such assumption.
Look at the implement of the official Java client:
public OffsetFetchResponse(int throttleTimeMs, Errors error, Map<TopicPartition, PartitionData> responseData) {
super(ApiKeys.OFFSET_FETCH);
Map<String, OffsetFetchResponseTopic> offsetFetchResponseTopicMap = new HashMap<>();
for (Map.Entry<TopicPartition, PartitionData> entry : responseData.entrySet()) {
String topicName = entry.getKey().topic();
OffsetFetchResponseTopic topic = offsetFetchResponseTopicMap.getOrDefault(
topicName, new OffsetFetchResponseTopic().setName(topicName));
PartitionData partitionData = entry.getValue();
topic.partitions().add(new OffsetFetchResponsePartition()
.setPartitionIndex(entry.getKey().partition())
.setErrorCode(partitionData.error.code())
.setCommittedOffset(partitionData.offset)
.setCommittedLeaderEpoch(
partitionData.leaderEpoch.orElse(NO_PARTITION_LEADER_EPOCH))
.setMetadata(partitionData.metadata)
);
offsetFetchResponseTopicMap.put(topicName, topic);
}
// ...
}
It uses getOrDefault to get the partition offset info of a topic. Which means it allows topic appears multiple times in the response list.
Hello @maralla, thanks for the submission!
Do you happen to have a way to reproduce this behavior? It would be amazing if we could figure out a way to add a test that validates the fix!
Hello @maralla, I just wanted to follow up on this pull request, do you have any updates to share with us?