pulsar-client-go icon indicating copy to clipboard operation
pulsar-client-go copied to clipboard

[Bug] consumer under same namespace are blocked

Open jujiale opened this issue 7 months ago • 4 comments

Expected behavior

consumers under the same namespace are not blocked

Actual behavior

consumers under the same namespace are all blocked

Steps to reproduce

1.create several pulsar topics, they under same namespace, and all topic have consumers. send msg to these topics continuously. 2.create some consumers, this consumers contain retry and dlq queue, the code as below. `

c, err := pulsarClient.Subscribe(pulsar.ConsumerOptions{
		Topic:            common.GetEventbridgeRuleTopicNameV2(target.Rule.Eventbridge.Name, target.Rule.Eventbridge.ID, target.Rule.Name, target.Rule.ID),
		SubscriptionName:  common.GetEventbridgeTargetSubscriptionNameV2(target.Name, target.ID),,
		Type:              pulsar.Shared,
		RetryEnable:       true,
		NackBackoffPolicy: ExponentialNackBackoffRetryPolicy{},
		DLQ: &pulsar.DLQPolicy{
			MaxDeliveries: 20,
			DeadLetterTopic: common.GetEventbridgeTargetDLQTopicNameV2(
				common.GetEventbridgeRuleTopicNameV2(target.Rule.Eventbridge.Name, target.Rule.Eventbridge.ID, target.Rule.Name, target.Rule.ID),
				target.Name,
				target.ID,
			),
			RetryLetterTopic: common.GetEventbridgeTargetRetryTopicNameV2(
				common.GetEventbridgeRuleTopicNameV2(target.Rule.Eventbridge.Name, target.Rule.Eventbridge.ID, target.Rule.Name, target.Rule.ID),
				target.Name,
				target.ID,
			),
			InitSubscriptionName: "default",
		},
	})

`

common package as below, very simple logic `

func GetEventbridgeRuleTopicNameV2(eventbridgeName string, eventbridgeId int64, ruleName string, ruleId int64) string {
	return fmt.Sprintf("persistent://eventbridge/%s-%d/rule-%s-%d", eventbridgeName, eventbridgeId, ruleName, ruleId)
}

func GetEventbridgeTargetSubscriptionNameV2(targetName string, targetId int64) string {
	return fmt.Sprintf("target-%s-%d", targetName, targetId)
}

func GetEventbridgeTargetDLQTopicNameV2(topic string, targetName string, targetId int64) string {
	return fmt.Sprintf("%s-target-%s-%d-DLQ", topic, targetName, targetId)
}

func GetEventbridgeRuleTopicNameV2(eventbridgeName string, eventbridgeId int64, ruleName string, ruleId int64) string {
	return fmt.Sprintf("persistent://eventbridge/%s-%d/rule-%s-%d", eventbridgeName, eventbridgeId, ruleName, ruleId)
}

func GetEventbridgeTargetRetryTopicNameV2(topic string, targetName string, targetId int64) string {
	return fmt.Sprintf("%s-target-%s-%d-RETRY", topic, targetName, targetId)
}

` 3. after consumer reviced msg, we assume handle msg occurs error, invoke unAck() method directly. 4. the producer sustain send message to topic, the message flow is very large, after several time, the consumer could not consumer any message, also under the same namespace, other topic's consumer could nerver consume any msg either(other topic msg flow is very small).

we use pulsar-client-go 0.12.0, find above problem exist, but when we use 0.15.0, the problem is disappear so I want to know why 0.12.0 have this problem, if there is some issue that linked this problem.

the pulsar conf below, we set the unacked msg all are 0

Image

System configuration

Pulsar Server version: both 2.8.1 and 3.0.6 have this problem Pulsar Go Client version: 0.12.0

jujiale avatar May 09 '25 08:05 jujiale

this problem I also described in pulsar community, #https://github.com/apache/pulsar/issues/24231

jujiale avatar May 09 '25 08:05 jujiale

@shibd hello, I find your pr:#https://github.com/apache/pulsar-client-go/pull/1311, I want to know in this issue, if the problem is have some linked with #https://github.com/apache/pulsar-client-go/pull/1311, thank you a lot.

jujiale avatar May 09 '25 09:05 jujiale

Pulsar Go Client version: 0.12.0

In the other comment, you said that this problem is fixed with 0.15.0. Please don't report issues that have been fixed, it's preferred to ask questions in GitHub Discussions Q&A.

lhotari avatar May 09 '25 11:05 lhotari

In the other comment, you said that this problem is fixed with 0.15.0. Please don't report issues that have been fixed, it's preferred to ask questions in GitHub Discussions Q&A.

ok ,I see, I will obey in the future, thanks for your remind.

jujiale avatar May 09 '25 11:05 jujiale