confluent-kafka-go icon indicating copy to clipboard operation
confluent-kafka-go copied to clipboard

Memory leak while producing message?

Open devFallingstar opened this issue 1 year ago • 5 comments

Description

I'm trying to build API server that send to Kafka message queue when it receive request with HTTP Call. The servers are running on the Kubernetes. Whenever it runs over few hours(like 9+ hours), it shows me OOMKilled.

image

This Picture shows the heap dump right before OOMKilled occurred. Seems like _Cfunc_GoString is too high and didn't free at all.

image

This picture shows the diff_base between two heap dumps, which were right after server started and right before OOMKilled occurred.

How to reproduce

func (p *DClient) InitProducer() error {
	producer, err := kafka.NewProducer(&kafka.ConfigMap{
		"bootstrap.servers":   p.BoostrapServers,
		"retries":             2147483647,
		"acks":                "all",
		"batch.size":          65536,
		"linger.ms":           1000,
		"compression.type":    "lz4",
	})
	if err != nil {
		logger.Error("Failed to get producer", zap.Error(err))
		return err
	}
	p.Producer = producer
	return nil
}

func (p *DClient) ProduceMessage(msg []byte, flushDelay int) error {
	deliveryChan := make(chan kafka.Event)
	go func() {
		for e := range deliveryChan {
			switch ev := e.(type) {
			case *kafka.Message:
				m := ev
				if m.TopicPartition.Error != nil {
					logger.Warn("Failed to deliver message", zap.Error(m.TopicPartition.Error))
				} else {
					logger.Debug(fmt.Sprintf("Delivered message to topic %s [%d] at offset %v\n",
						*m.TopicPartition.Topic, m.TopicPartition.Partition, m.TopicPartition.Offset))
				}
			default:
				// Ignore default event
			}

			close(deliveryChan)
		}
	}()

	err := p.Producer.Produce(&kafka.Message{
		TopicPartition: kafka.TopicPartition{Topic: &p.Topic, Partition: kafka.PartitionAny},
		Value:          msg,
	}, deliveryChan)
	if err != nil {
		close(deliveryChan)
		logger.Error("Failed to produce message", zap.Error(err))
		return err
	}

	for p.Producer.Flush(flushDelay) > 0 {
		// Wait for message deliveries before shutting down
	}

	return nil
}

// From another file, run below code when received HTTP requests
go func() {
	err := p.ProduceMessage(msg, 100)
	if err != nil {
		logger.Warn("Failed to put Kafka message", zap.Error(err))
	}
}()

I thought it would be solved after adding delivery channel, but it wasn't. Did I use it wrong?

Checklist

Please provide the following information:

  • [x] confluent-kafka-go and librdkafka version: LibraryVersion(v1.8.2)
  • [ ] Apache Kafka broker version: Can't see in my side.
  • [x] Client configuration: &kafka.ConfigMap{ "bootstrap.servers": p.BoostrapServers, "retries": 2147483647, "acks": "all", "batch.size": 65536, "linger.ms": 1000, "compression.type": "lz4", }
  • [x] Operating system: CentOS
  • [ ] Provide client logs (with "debug": ".." as necessary)
  • [ ] Provide broker log excerpts
  • [x] Critical issue : yes

devFallingstar avatar Aug 31 '23 02:08 devFallingstar

@devFallingstar Have you maybe checked if the same problem occurs with the latest version?

pluschnikow avatar Aug 31 '23 08:08 pluschnikow

@pluschnikow No, but I found a way to solve the problems just right now.

Since ProduceMessage() function called by multiple goroutines, Flush() doesn't work as I expected. After I removed Flush function calls (Thought it would be okay because I made producer not to close itself while server is running), memory usage is seems to be okay. Does this problem will be solved at the latest version?

devFallingstar avatar Aug 31 '23 09:08 devFallingstar

Seeing same issue, GOString keeps accumulating not getting cleaned up. Any pointers on what could be the problem(removing flush doesnot seem to solve). Highlighted one keeps growing:

Showing nodes accounting for 373.51MB, 98.71% of 378.37MB total Dropped 30 nodes (cum <= 1.89MB) Showing top 10 nodes out of 12 flat flat% sum% cum cum% 203.08MB 53.67% 53.67% 203.08MB 53.67% github.com/confluentinc/confluent-kafka-go/kafka._Cfunc_GoString (inline) 160.29MB 42.36% 96.03% 160.29MB 42.36% github.com/confluentinc/confluent-kafka-go/kafka.NewProducer 9.64MB 2.55% 98.58% 147.03MB 38.86% logproc/kfreader.PublishToMsgbus.func2 0.50MB 0.13% 98.71% 203.58MB 53.80% github.com/confluentinc/confluent-kafka-go/kafka.(*handle).eventPoll 0 0% 98.71% 203.58MB 53.80% github.com/confluentinc/confluent-kafka-go/kafka.NewProducer.func5 0 0% 98.71% 203.58MB 53.80% github.com/confluentinc/confluent-kafka-go/kafka.poller 0 0% 98.71% 160.29MB 42.36% logproc/kafka.CreateProducer 0 0% 98.71% 22.90MB 6.05% logproc/kafka.ListTopics 0 0% 98.71% 149.76MB 39.58% logproc/kfreader.PublishToMsgbus 0 0% 98.71% 2.74MB 0.72% logproc/kfreader.PublishToMsgbus.func1

VivekThrivikraman-est avatar Sep 22 '23 14:09 VivekThrivikraman-est

Upgrading to 2.2 version resolved this.

VivekThrivikraman-est avatar Sep 29 '23 15:09 VivekThrivikraman-est

Hi, even if you're using a custom delivery report channel you should still poll from the Events channel for other events https://github.com/confluentinc/confluent-kafka-go/blob/d194ad14d58e17147c529ac4715850ca0a62134c/examples/producer_example/producer_example.go#L51

emasab avatar Apr 25 '24 07:04 emasab