tile38
tile38 copied to clipboard
memory leak in azure eventhub endpoint
connection should be reused instead of creating a new connection in every send causing
- usage of memory from a few mb to gb for the same workload
- eventually hitting the connection limit of eventhub even for a small workload
i have verifed with below changes
type EvenHubConn struct {
mu sync.Mutex
ep Endpoint
conn *eventhub.Hub
}
func (conn *EvenHubConn) close() {
if conn.conn != nil {
conn.conn.Close(context.Background())
conn.conn = nil
}
}
// Send sends a message
func (conn *EvenHubConn) Send(msg string) error {
conn.mu.Lock()
defer conn.mu.Unlock()
if conn.conn == nil {
hub, err := eventhub.NewHubFromConnectionString(conn.ep.EventHub.ConnectionString)
if err != nil {
return err
}
conn.conn = hub
}
ctx, cancel := context.WithTimeout(context.Background(), 20*time.Second)
defer cancel()
// parse json again to get out info for our kafka key
key := gjson.Get(msg, "key")
id := gjson.Get(msg, "id")
keyValue := fmt.Sprintf("%s-%s", key.String(), id.String())
evtHubMsg := eventhub.NewEventFromString(msg)
evtHubMsg.PartitionKey = &keyValue
err := conn.conn.Send(ctx, evtHubMsg)
if err != nil {
conn.close()
return err
}
return nil
}
Hey @lokisisland, do you think you can take a look at this? :)