tile38 icon indicating copy to clipboard operation
tile38 copied to clipboard

memory leak in azure eventhub endpoint

Open eduardotang opened this issue 1 year ago • 1 comments

connection should be reused instead of creating a new connection in every send causing

  • usage of memory from a few mb to gb for the same workload
  • eventually hitting the connection limit of eventhub even for a small workload

i have verifed with below changes

type EvenHubConn struct {
	mu   sync.Mutex
	ep   Endpoint
	conn *eventhub.Hub
}

func (conn *EvenHubConn) close() {
	if conn.conn != nil {
		conn.conn.Close(context.Background())
		conn.conn = nil
	}
}

// Send sends a message
func (conn *EvenHubConn) Send(msg string) error {
	conn.mu.Lock()
	defer conn.mu.Unlock()

	if conn.conn == nil {
		hub, err := eventhub.NewHubFromConnectionString(conn.ep.EventHub.ConnectionString)

		if err != nil {
			return err
		}
		conn.conn = hub
	}
	ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
	defer cancel()

	// parse json again to get out info for our kafka key
	key := gjson.Get(msg, "key")
	id := gjson.Get(msg, "id")
	keyValue := fmt.Sprintf("%s-%s", key.String(), id.String())

	evtHubMsg := eventhub.NewEventFromString(msg)
	evtHubMsg.PartitionKey = &keyValue
	err := conn.conn.Send(ctx, evtHubMsg)
	if err != nil {
		conn.close()
		return err
	}

	return nil
}

eduardotang avatar Dec 05 '22 03:12 eduardotang

Hey @lokisisland, do you think you can take a look at this? :)

iwpnd avatar Dec 29 '22 11:12 iwpnd