redpanda icon indicating copy to clipboard operation
redpanda copied to clipboard

AlterConfig does not revert and reset config to inherited defaults

Open r-vasquez opened this issue 1 year ago • 1 comments

Version & Environment

Redpanda version: (use rpk version): v24.1.2

What went wrong?

Create Topic with the following configuration.

"compression.type":"gzip"
"flush.ms":"100"
"redpanda.remote.read":"false"

Describe the topic, all three configuration values are reported as DYNAMIC_TOPIC_CONFIG as expected.

Set the configuration to a different value (removing flush.ms) using AlterConfig:

"compression.type":"gzip"
"redpanda.remote.read":"false"

After setting the topic configuration without flush.ms, we still see it as a DYNAMIC_TOPIC_CONFIG and DEFAULT_CONFIG was expected.

What should have happened instead?

To revert and reset config to inherited default, marking it as a DEFAULT_CONFIG after issuing the AlterConfig without the property.

How to reproduce the issue?

  1. run rpk container start
  2. You can run the following code to reproduce the error.
package main

import (
	"context"
	"log"
	"time"

	"github.com/twmb/franz-go/pkg/kadm"
	"github.com/twmb/franz-go/pkg/kgo"
	"github.com/twmb/franz-go/pkg/kmsg"
)

func main() {
	brokerAddr := "localhost:9092"
	topic := "test-alter-configs"

	kafkaClient, kafkaAdminClient := CreateClients([]string{brokerAddr})

	topicConfigs := map[string]*string{
		"cleanup.policy":   kmsg.StringPtr("delete"),
		"retention.bytes":  kmsg.StringPtr("1000"),
		"compression.type": kmsg.StringPtr("snappy"),
		"flush.ms":         kmsg.StringPtr("200"),
	}
	CreateTopic(kafkaAdminClient, topicConfigs, topic)
	DescribeConfigs(kafkaAdminClient, topic)

	log.Println("===")

	topicConfigs = map[string]*string{
		"cleanup.policy":   kmsg.StringPtr("delete"),
		"compression.type": kmsg.StringPtr("gzip"),
	}

	SetConfigs(kafkaClient, topicConfigs, topic)
	DescribeConfigs(kafkaAdminClient, topic)

	_, err := kafkaAdminClient.DeleteTopic(context.Background(), topic)
	if err != nil {
		log.Fatal(err)
	}
}

func CreateTopic(kafkaAdminClient *kadm.Client, configs map[string]*string, topic string) {
	topicDetails, err := kafkaAdminClient.ListTopics(context.Background())

	if err != nil {
		log.Fatal(err)
	}

	if !topicDetails.Has(topic) {
		resp, _ := kafkaAdminClient.CreateTopics(context.Background(), 1, 1, configs, topic)
		for _, ctr := range resp {
			if ctr.Err != nil {
				log.Fatal(ctr.Err)
			}
		}

		log.Printf("topic %+v created.", topic)
	} else {
		log.Printf("topic %+v already exists.", topic)
	}
}

func SetConfigs(kafkaClient *kgo.Client, configs map[string]*string, topic string) {
	alterConfigResource := kmsg.NewAlterConfigsRequestResource()
	alterConfigResource.ResourceType = kmsg.ConfigResourceTypeTopic
	alterConfigResource.ResourceName = topic

	for k, v := range configs {
		kafkaACReq := kmsg.NewAlterConfigsRequestResourceConfig()
		kafkaACReq.Name = k
		kafkaACReq.Value = kmsg.StringPtr(*v)

		alterConfigResource.Configs = append(alterConfigResource.Configs, kafkaACReq)
	}

	kafkaReq := kmsg.NewAlterConfigsRequest()
	kafkaReq.Resources = []kmsg.AlterConfigsRequestResource{alterConfigResource}

	_, err := kafkaReq.RequestWith(context.Background(), kafkaClient)
	if err != nil {
		log.Fatal(err)
	}
}

func DescribeConfigs(kafkaAdminClient *kadm.Client, topic string) {
	tcRes, err := kafkaAdminClient.DescribeTopicConfigs(context.Background(), topic)
	if err != nil {
		log.Fatal(err)
	}

	for _, tc := range tcRes {
		for _, c := range tc.Configs {
			log.Println(c.Key, *c.Value, c.Source.String())
		}
	}
}

func CreateClients(brokers []string) (*kgo.Client, *kadm.Client) {
	opts := []kgo.Opt{
		kgo.SeedBrokers(brokers...),
		// kgo.MaxVersions(kversion.V2_6_0()),
		kgo.FetchMaxBytes(5 * 1000 * 1000), // 5MB
		kgo.MaxConcurrentFetches(12),
		kgo.KeepControlRecords(),
		kgo.MetadataMinAge(250 * time.Millisecond),
	}

	kClient, err := kgo.NewClient(opts...)
	if err != nil {
		log.Fatal(err)
	}

	kafkaAdmCl := kadm.NewClient(kClient)

	return kClient, kafkaAdmCl
}

You'll see:

2024/05/27 06:28:15 topic test-alter-configs created.
2024/05/27 06:28:15 compression.type snappy DYNAMIC_TOPIC_CONFIG
2024/05/27 06:28:15 cleanup.policy delete DEFAULT_CONFIG
2024/05/27 06:28:15 segment.bytes 134217728 DEFAULT_CONFIG
2024/05/27 06:28:15 retention.ms 604800000 DEFAULT_CONFIG
2024/05/27 06:28:15 retention.bytes 1000 DYNAMIC_TOPIC_CONFIG
2024/05/27 06:28:15 message.timestamp.type CreateTime DEFAULT_CONFIG
2024/05/27 06:28:15 max.message.bytes 1048576 DEFAULT_CONFIG
2024/05/27 06:28:15 write.caching true DEFAULT_CONFIG
2024/05/27 06:28:15 flush.bytes 262144 DEFAULT_CONFIG
2024/05/27 06:28:15 flush.ms 200 DYNAMIC_TOPIC_CONFIG
...
2024/05/27 06:28:15 ===
2024/05/27 06:28:15 compression.type gzip DYNAMIC_TOPIC_CONFIG
2024/05/27 06:28:15 cleanup.policy delete DEFAULT_CONFIG
2024/05/27 06:28:15 segment.bytes 134217728 DEFAULT_CONFIG
2024/05/27 06:28:15 retention.ms 604800000 DEFAULT_CONFIG
2024/05/27 06:28:15 retention.bytes -1 DEFAULT_CONFIG
2024/05/27 06:28:15 message.timestamp.type CreateTime DEFAULT_CONFIG
2024/05/27 06:28:15 max.message.bytes 1048576 DEFAULT_CONFIG
2024/05/27 06:28:15 write.caching true DEFAULT_CONFIG
2024/05/27 06:28:15 flush.bytes 262144 DEFAULT_CONFIG
2024/05/27 06:28:15 flush.ms 200 DYNAMIC_TOPIC_CONFIG

^ flush.ms was not reverted to DEFAULT_CONFIG, while retention.bytes was.

Additional information

Original Discussion: https://redpandadata.slack.com/archives/C01ND4SVB6Z/p1715020159666599

JIRA Link: CORE-3091

r-vasquez avatar May 26 '24 10:05 r-vasquez

This issue hasn't seen activity in 3 months. If you want to keep it open, post a comment or remove the stale label – otherwise this will be closed in two weeks.

github-actions[bot] avatar Aug 26 '24 06:08 github-actions[bot]

Fixed by https://github.com/redpanda-data/redpanda/pull/21291

r-vasquez avatar Sep 09 '24 22:09 r-vasquez