AlterConfig does not revert and reset config to inherited defaults
Version & Environment
Redpanda version: (use rpk version): v24.1.2
What went wrong?
Create Topic with the following configuration.
"compression.type":"gzip"
"flush.ms":"100"
"redpanda.remote.read":"false"
Describe the topic, all three configuration values are reported as DYNAMIC_TOPIC_CONFIG as expected.
Set the configuration to a different value (removing flush.ms) using AlterConfig:
"compression.type":"gzip"
"redpanda.remote.read":"false"
After setting the topic configuration without flush.ms, we still see it as a DYNAMIC_TOPIC_CONFIG and DEFAULT_CONFIG was expected.
What should have happened instead?
To revert and reset config to inherited default, marking it as a DEFAULT_CONFIG after issuing the AlterConfig without the property.
How to reproduce the issue?
- run
rpk container start - You can run the following code to reproduce the error.
package main
import (
"context"
"log"
"time"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
"github.com/twmb/franz-go/pkg/kmsg"
)
func main() {
brokerAddr := "localhost:9092"
topic := "test-alter-configs"
kafkaClient, kafkaAdminClient := CreateClients([]string{brokerAddr})
topicConfigs := map[string]*string{
"cleanup.policy": kmsg.StringPtr("delete"),
"retention.bytes": kmsg.StringPtr("1000"),
"compression.type": kmsg.StringPtr("snappy"),
"flush.ms": kmsg.StringPtr("200"),
}
CreateTopic(kafkaAdminClient, topicConfigs, topic)
DescribeConfigs(kafkaAdminClient, topic)
log.Println("===")
topicConfigs = map[string]*string{
"cleanup.policy": kmsg.StringPtr("delete"),
"compression.type": kmsg.StringPtr("gzip"),
}
SetConfigs(kafkaClient, topicConfigs, topic)
DescribeConfigs(kafkaAdminClient, topic)
_, err := kafkaAdminClient.DeleteTopic(context.Background(), topic)
if err != nil {
log.Fatal(err)
}
}
func CreateTopic(kafkaAdminClient *kadm.Client, configs map[string]*string, topic string) {
topicDetails, err := kafkaAdminClient.ListTopics(context.Background())
if err != nil {
log.Fatal(err)
}
if !topicDetails.Has(topic) {
resp, _ := kafkaAdminClient.CreateTopics(context.Background(), 1, 1, configs, topic)
for _, ctr := range resp {
if ctr.Err != nil {
log.Fatal(ctr.Err)
}
}
log.Printf("topic %+v created.", topic)
} else {
log.Printf("topic %+v already exists.", topic)
}
}
func SetConfigs(kafkaClient *kgo.Client, configs map[string]*string, topic string) {
alterConfigResource := kmsg.NewAlterConfigsRequestResource()
alterConfigResource.ResourceType = kmsg.ConfigResourceTypeTopic
alterConfigResource.ResourceName = topic
for k, v := range configs {
kafkaACReq := kmsg.NewAlterConfigsRequestResourceConfig()
kafkaACReq.Name = k
kafkaACReq.Value = kmsg.StringPtr(*v)
alterConfigResource.Configs = append(alterConfigResource.Configs, kafkaACReq)
}
kafkaReq := kmsg.NewAlterConfigsRequest()
kafkaReq.Resources = []kmsg.AlterConfigsRequestResource{alterConfigResource}
_, err := kafkaReq.RequestWith(context.Background(), kafkaClient)
if err != nil {
log.Fatal(err)
}
}
func DescribeConfigs(kafkaAdminClient *kadm.Client, topic string) {
tcRes, err := kafkaAdminClient.DescribeTopicConfigs(context.Background(), topic)
if err != nil {
log.Fatal(err)
}
for _, tc := range tcRes {
for _, c := range tc.Configs {
log.Println(c.Key, *c.Value, c.Source.String())
}
}
}
func CreateClients(brokers []string) (*kgo.Client, *kadm.Client) {
opts := []kgo.Opt{
kgo.SeedBrokers(brokers...),
// kgo.MaxVersions(kversion.V2_6_0()),
kgo.FetchMaxBytes(5 * 1000 * 1000), // 5MB
kgo.MaxConcurrentFetches(12),
kgo.KeepControlRecords(),
kgo.MetadataMinAge(250 * time.Millisecond),
}
kClient, err := kgo.NewClient(opts...)
if err != nil {
log.Fatal(err)
}
kafkaAdmCl := kadm.NewClient(kClient)
return kClient, kafkaAdmCl
}
You'll see:
2024/05/27 06:28:15 topic test-alter-configs created.
2024/05/27 06:28:15 compression.type snappy DYNAMIC_TOPIC_CONFIG
2024/05/27 06:28:15 cleanup.policy delete DEFAULT_CONFIG
2024/05/27 06:28:15 segment.bytes 134217728 DEFAULT_CONFIG
2024/05/27 06:28:15 retention.ms 604800000 DEFAULT_CONFIG
2024/05/27 06:28:15 retention.bytes 1000 DYNAMIC_TOPIC_CONFIG
2024/05/27 06:28:15 message.timestamp.type CreateTime DEFAULT_CONFIG
2024/05/27 06:28:15 max.message.bytes 1048576 DEFAULT_CONFIG
2024/05/27 06:28:15 write.caching true DEFAULT_CONFIG
2024/05/27 06:28:15 flush.bytes 262144 DEFAULT_CONFIG
2024/05/27 06:28:15 flush.ms 200 DYNAMIC_TOPIC_CONFIG
...
2024/05/27 06:28:15 ===
2024/05/27 06:28:15 compression.type gzip DYNAMIC_TOPIC_CONFIG
2024/05/27 06:28:15 cleanup.policy delete DEFAULT_CONFIG
2024/05/27 06:28:15 segment.bytes 134217728 DEFAULT_CONFIG
2024/05/27 06:28:15 retention.ms 604800000 DEFAULT_CONFIG
2024/05/27 06:28:15 retention.bytes -1 DEFAULT_CONFIG
2024/05/27 06:28:15 message.timestamp.type CreateTime DEFAULT_CONFIG
2024/05/27 06:28:15 max.message.bytes 1048576 DEFAULT_CONFIG
2024/05/27 06:28:15 write.caching true DEFAULT_CONFIG
2024/05/27 06:28:15 flush.bytes 262144 DEFAULT_CONFIG
2024/05/27 06:28:15 flush.ms 200 DYNAMIC_TOPIC_CONFIG
^ flush.ms was not reverted to DEFAULT_CONFIG, while retention.bytes was.
Additional information
Original Discussion: https://redpandadata.slack.com/archives/C01ND4SVB6Z/p1715020159666599
JIRA Link: CORE-3091
This issue hasn't seen activity in 3 months. If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in two weeks.
Fixed by https://github.com/redpanda-data/redpanda/pull/21291