sarama icon indicating copy to clipboard operation
sarama copied to clipboard

Azure Event Hubs: Broker not connected error when using alias for disaster recovery

Open sgallizia opened this issue 2 years ago • 1 comments

Versions
Sarama Kafka Go
1.32.0 Azure Event Hubs 1.18
Configuration

What configuration values are you using for Sarama and Kafka?

prdCfg.Version = sarama.V1_0_2_0 //I have also tried with V1_0_0_0
prdCfg.Producer.Return.Successes = true
cnsCfg.Version = sarama.V1_0_2_0
if config.Cfg.SaramaLog == "y" {
sarama.Logger, _ = zap.NewStdLogAt(logging.Get().Parent.With(zap.String("name", "sarama")), zapcore.DebugLevel)
}
cnsCfg.Consumer.Return.Errors = true
if strings.Contains(config.Cfg.OutputBinderBroker, "localhost") {
return
}
prdCfg.Net.SASL.Enable = true
prdCfg.Net.DialTimeout = 10 * time.Second
prdCfg.Net.SASL.User = config.Cfg.OutputBinderUser
prdCfg.Net.SASL.Password = config.Cfg.OutputBinderPassword
prdCfg.Net.SASL.Mechanism = "PLAIN"
prdCfg.Net.TLS.Enable = true
tlsConfig := &tls.Config{
InsecureSkipVerify: true,
ClientAuth:         0,
}
prdCfg.Net.TLS.Config = tlsConfig

cnsCfg.Net.SASL.Enable = true
cnsCfg.Net.DialTimeout = 10 * time.Second
cnsCfg.Net.SASL.User = config.Cfg.InputBinderUser
cnsCfg.Net.SASL.Password = config.Cfg.InputBinderPassword
cnsCfg.Net.SASL.Mechanism = "PLAIN"
cnsCfg.Net.TLS.Enable = true
cnsCfg.Net.TLS.Config = tlsConfig
Logs
logs: CLICK ME

{"log.level":"debug","@timestamp":"2022-04-28T14:41:30.585Z","log.origin":{"file.name":"[email protected]/consumer.go","file.line":920},"message":"consumer/broker/0 added subscription to reset-password/1","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:29.590Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":240},"message":"Connected to broker at st-evnt-bked-cana-we-01.servicebus.windows.net:9093 (registered as #0)","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:29.590Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":1242},"message":"SASL authentication successful with broker st-evnt-bked-cana-we-01.servicebus.windows.net:9093:4 - [0 0 0 0]","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:29.588Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":1169},"message":"Successful SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"[email protected]/client.go","file.line":959},"message":"client/brokers replaced registered broker #0 with st-evnt-bked-cana-we-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":163},"message":"ClientID is the default of 'sarama', you should consider setting it to something application-specific.","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:29.542Z","log.origin":{"file.name":"[email protected]/consumer.go","file.line":920},"message":"consumer/broker/0 added subscription to reset-password/0","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:29.542Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":299},"message":"Closed connection to broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"[email protected]/client.go","file.line":883},"message":"client/metadata fetching metadata for [reset-password] from broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:29.541Z","log.origin":{"file.name":"[email protected]/client.go","file.line":883},"message":"client/metadata fetching metadata for [reset-password] from broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"[email protected]/utils.go","file.line":43},"message":"consumer/broker/0 disconnecting due to error processing FetchRequest: kafka: broker not connected","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"error","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/1: kafka: broker not connected"},"ecs.version":"1.6.0"}

{"log.level":"error","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/0: kafka: broker not connected"},"ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:27.540Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":299},"message":"Closed connection to broker st-evnt-bked-cana-we-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:27.403Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":1242},"message":"SASL authentication successful with broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093:4 - [0 0 0 0]","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:27.404Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":240},"message":"Connected to broker at st-evnt-bked-cana-geo-01.servicebus.windows.net:9093 (registered as #0)","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:27.402Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":1169},"message":"Successful SASL handshake. Available mechanisms: [PLAIN OAUTHBEARER]","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:27.309Z","log.origin":{"file.name":"[email protected]/client.go","file.line":1072},"message":"client/coordinator coordinator for consumergroup reset-password-dequeuer is #0 (st-evnt-bked-cana-geo-01.servicebus.windows.net:9093)","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:27.309Z","log.origin":{"file.name":"[email protected]/client.go","file.line":596},"message":"client/brokers replaced registered broker #0 with st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:27.309Z","log.origin":{"file.name":"[email protected]/broker.go","file.line":163},"message":"ClientID is the default of 'sarama', you should consider setting it to something application-specific.","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:27.308Z","log.origin":{"file.name":"[email protected]/client.go","file.line":1050},"message":"client/coordinator requesting coordinator for consumergroup reset-password-dequeuer from st-evnt-bked-cana-geo-01.servicebus.windows.net:9093","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"error","@timestamp":"2022-04-28T14:41:26.308Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/0: kafka: broker not connected"},"ecs.version":"1.6.0"}

{"log.level":"error","@timestamp":"2022-04-28T14:41:26.308Z","log.origin":{"file.name":"kafka/consumer.go","file.line":45},"message":"Error on consumer","consumer_group":"&sarama.consumerGroup{client:(*sarama.client)(0xc0000d4090), config:(*sarama.Config)(0xc00022f180), consumer:(*sarama.consumer)(0xc0000d0630), groupID:\"reset-password-dequeuer\", memberID:\"st-evnt-bked-cana-we-01.servicebus.windows.net:c:reset-password-dequeuer:I:sarama-908e8d713767408eb1d516136b0914e7\", errors:(chan error)(0xc000282cc0), lock:sync.Mutex{state:1, sema:0x0}, closed:(chan sarama.none)(0xc0001003c0), closeOnce:sync.Once{done:0x0, m:sync.Mutex{state:0, sema:0x0}}, userData:[]uint8(nil)}","error":{"message":"kafka: error while consuming reset-password/1: kafka: broker not connected"},"ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:25.623Z","log.origin":{"file.name":"kafka/producer.go","file.line":34},"message":"message sent","partition":0,"offset":14,"ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:25.585Z","log.origin":{"file.name":"[email protected]/utils.go","file.line":43},"message":"producer/broker/0 starting up","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:25.586Z","log.origin":{"file.name":"[email protected]/utils.go","file.line":43},"message":"producer/broker/0 state change to [open] on reset-password/0","name":"sarama","ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:25.585Z","log.origin":{"file.name":"kafka/handler.go","file.line":40},"message":"Message processing","topic":"reset-password","partition":0,"offset":15,"ecs.version":"1.6.0"}

{"log.level":"debug","@timestamp":"2022-04-28T14:41:25.585Z","log.origin":{"file.name":"kafka/handler.go","file.line":41},"message":"Message content","content":"{\r\n  \"Event\": {\r\n    \"id\": \"0123456789\",\r\n    \"type\": \"AbilitazioniServiziTelematici\",\r\n    \"notification_time\": 1634131014999\r\n  },\r\n  \"Action\": {\r\n    \"type\": \"Update\",\r\n    \"subtype\": \"Reset_password\"\r\n  },\r\n  \"Element\": {\r\n    \"client_code\": \"0056999\"\r\n  },\r\n  \"Data\": {\r\n    \"fiscal_code\": \"TSTMJR57C1xxxxxx\"\r\n  }\r\n}","ecs.version":"1.6.0"}


Problem Description

We have a service written in go which connects to azure event hub through the sarama library: it reads a message from a queue and it publishes it into another. We receives an error from the sarama consumer every time the service reads a message from the queue:

"kafka: error while consuming reset-password/1: kafka: broker not connected"

The error occurs when the service connects to event hub through the alias for disaster recovery (broker st-evnt-bked-cana-geo-01.servicebus.windows.net:9093). We have the alias st-evnt-bked-cana-geo-01 with the primary namespace st-evnt-bked-cana-we-01 and the secondary namespace st-evnt-bked-cana-ne-01. If we connect directly with the broker st-evnt-bked-cana-we-01.servicebus.windows.net:9093, than the error doesn't occur. Although this error, the message is received and published on the output queue, but we are concerned about the impact on the performance, because we can see from the logs that sarama reconnects to another broker (st-evnt-bked-cana-geo-01 or st-evnt-bked-cana-we-01). We already opened a ticket on Azure, but for Microsoft everything is ok.

sgallizia avatar May 06 '22 08:05 sgallizia

My colleague fixed the issue changing how the consumer group is created:

func newConsumerGroup(cnsCfg *sarama.Config) (sarama.ConsumerGroup, error) {
	cnsCfg.Metadata.RefreshFrequency = 0 * time.Second
	client, err := sarama.NewClient([]string{config.Value.Stream.Broker}, cnsCfg)
	if err != nil {
		return nil, err
	}
	broker, err := getBroker(client)
	if err != nil {
		return nil, err
	}
	splitPort := strings.Split(config.Value.Stream.Broker, ":")
	cnsCfg.Net.SASL.Password = strings.ReplaceAll(
		cnsCfg.Net.SASL.Password, splitPort[0], broker.Addr())
	cnsCfg.Metadata.RefreshFrequency = 10 * time.Minute
	client, err = sarama.NewClient([]string{broker.Addr()}, cnsCfg)
	if err != nil {
		return nil, err
	}
	group, err := sarama.NewConsumerGroupFromClient(config.Value.Stream.ConsumerGroup, client)
	if err != nil {
		return nil, err
	}
	return group, nil
}

func getBroker(client sarama.Client) (*sarama.Broker, error) {
	coordinator, err := client.Coordinator(config.Value.Stream.ConsumerGroup)
	if err != nil {
		return nil, err
	}
	err = client.RefreshMetadata()
	if err != nil {
		return nil, err
	}
	broker, err := client.Broker(coordinator.ID())
	if err != nil {
		return nil, err
	}
	err = client.Close()
	return broker, nil
}

I suspect that something is wrong on azure event hub side: I think that the kafka api wrapper on the event hub system sometimes has strange behavior.

sgallizia avatar Jun 01 '22 10:06 sgallizia

Thank you for taking the time to raise this issue. However, it has not had any activity on it in the past 90 days and will be closed in 30 days if no updates occur. Please check if the main branch has already resolved the issue since it was raised. If you believe the issue is still valid and you would like input from the maintainers then please comment to ask for it to be reviewed.

github-actions[bot] avatar Aug 18 '23 00:08 github-actions[bot]

Sounds like this issue was resolved 👍🏻

dnwe avatar Aug 18 '23 06:08 dnwe