pulsar
pulsar copied to clipboard
Consumer able to receive message which is not matching the regex pattern
Search before asking
- [X] I searched in the issues and found nothing similar.
Read release policy
- [X] I understand that unsupported versions don't get bug fixes. I will attempt to reproduce the issue on a supported version of Pulsar client and Pulsar broker.
Version
3.2.2
Minimal reproduce step
Steps to reproduce:
- send messages to multiple topics using producer
- consumer provide the regex pattern topic name: non-persistent://my-tenant/new-name.*
- provide the subscriptionTopicsMode = AllTopics
What did you expect to see?
- Consumer should be able to consume messages from the topic starts with non-persistent://my-tenant/new-name
What did you see instead?
- Consumer was able to consume messages from the topic starts with persistent as well
package Pulsar;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class AllTopicsConsumerExample {
private static PulsarAdmin adm;
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String NAMESPACE = "my-tenant/new-name";
private static final String SUBSCRIPTION_NAME = "your-subscription";
public static void main(String[] args) throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("non-persistent://my-tenant/new-name/topic-non-1")
.enableBatching(false).create();
producer.send("=========from topic non-persistent://my-tenant/new-name/topic-non-1 ");
System.out.println("new producer");
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-tenant/new-name/topic-pers-1")
.enableBatching(false).create();
producer1.send("=======from topic persistent://my-tenant/new-name/topic-pers-1 ");
Pattern allTopicsPattern = Pattern.compile("non-persistent://my-tenant/new-name/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(allTopicsPattern)
.subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.valueOf("AllTopics"))
.subscribe();
while (true) {
Message<byte[]> message = allTopicsConsumer.receive();
System.out.println("Received message from topic " + message.getTopicName()
+ ": " + new String(message.getValue()));
allTopicsConsumer.acknowledge(message);
}
}
}
Anything else?
No response
Are you willing to submit a PR?
- [ ] I'm willing to submit a PR!
According to https://github.com/apache/pulsar/issues/19798, there is no need to specify persistent://
or non-persistent://
prefix in the topicsPattern
After discuss, we don't need this PIP, we just need to:
- Add the warn log when the user-configured pattern contains a domain(‘persistent://public/default/topic.*')
- Enhancement of the documentation, patternTopics cannot contain domains.
ok Thank you @visortelle But for non-persistent I was not able to do multi topic subscription and as you said "Once in about ~5 runs I see some messages from non-persistent topic"
means for non-persistent it is not working properly for non-persistent topic.
@ragaur-tibco I completely agree and don't argue with that.
@ragaur-tibco the current behavior is correct. See my comment here: https://github.com/apache/pulsar/issues/22527#issuecomment-2067133567
@ragaur-tibco I fixed your code.
When using non-persistent delivery, killing a Pulsar broker or disconnecting a subscriber to a topic means that all in-transit messages are lost on that (non-persistent) topic, meaning that clients may see message loss.
Source: https://pulsar.apache.org/docs/next/cookbooks-non-persistent/#overview
In your code, the subscriber was created after messages were sent.
Code:
package b;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class App {
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String SUBSCRIPTION_NAME = "your-subscription";
public static void main(String[] args) throws Exception {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
Producer<String> producerA = pulsarClient.newProducer(Schema.STRING)
.topic("non-persistent://my-tenant/new-name/topic-non-1")
.enableBatching(false).create();
Producer<String> producerB = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://my-tenant/new-name/topic-pers-1")
.enableBatching(false).create();
Pattern allTopicsPattern = Pattern.compile("my-tenant/new-name/.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern(allTopicsPattern)
.subscriptionName(SUBSCRIPTION_NAME)
.subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();
producerA.send("=========from topic non-persistent://my-tenant/new-name/topic-non-1 ");
producerB.send("=========from topic persistent://my-tenant/new-name/topic-pers-1 ");
while (true) {
Message<byte[]> message = allTopicsConsumer.receive();
System.out.println("Received message from topic " + message.getTopicName()
+ ": " + new String(message.getValue()));
allTopicsConsumer.acknowledge(message);
}
}
}
Logs:
a mvn exec:java <aws:aws-superadmin> <region:us-east-2>
[INFO] Scanning for projects...
[INFO]
[INFO] --------------------------------< c:a >---------------------------------
[INFO] Building a 1.0-SNAPSHOT
[INFO] --------------------------------[ jar ]---------------------------------
[INFO]
[INFO] --- exec-maven-plugin:3.2.0:java (default-cli) @ a ---
SLF4J: Failed to load class "org.slf4j.impl.StaticLoggerBinder".
SLF4J: Defaulting to no-operation (NOP) logger implementation
SLF4J: See http://www.slf4j.org/codes.html#StaticLoggerBinder for further details.
Received message from topic non-persistent://my-tenant/new-name/topic-non-1: =========from topic non-persistent://my-tenant/new-name/topic-non-1
Received message from topic persistent://my-tenant/new-name/topic-pers-1: =========from topic persistent://my-tenant/new-name/topic-pers-1
@ragaur-tibco please check and let me know if it resolves the issue.
Hi @visortelle
I tried creating subscriber before sending the messages
package Pulsar;
import org.apache.pulsar.client.admin.PulsarAdmin;
import org.apache.pulsar.client.admin.PulsarAdminException;
import org.apache.pulsar.client.api.*;
import java.util.List;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
public class AllTopicsConsumerExample {
private static PulsarAdmin adm;
private static final String SERVICE_URL = "pulsar://localhost:6650";
private static final String NAMESPACE = "my-tenant/new-name";
private static final String SUBSCRIPTION_NAME = "your-subscription-1";
public static void main(String[] args) throws PulsarClientException {
PulsarClient pulsarClient = PulsarClient.builder()
.serviceUrl(SERVICE_URL)
.build();
// Pattern allTopicsPattern = Pattern.compile("tenant-1/name/topic.*");
Consumer<byte[]> allTopicsConsumer = pulsarClient.newConsumer()
.topicsPattern("tenant-1/name/topic.*").subscriptionType(SubscriptionType.Shared)
.subscriptionName(SUBSCRIPTION_NAME).subscriptionTopicsMode(RegexSubscriptionMode.AllTopics)
.subscribe();
Producer<String> producer = pulsarClient.newProducer(Schema.STRING)
.topic("non-persistent://tenant-1/name/topic-1")
.enableBatching(false).create();
System.out.println("new producer");
producer.send("=========from topic non-persistent://tenant-1/name/topic-1 ");
Producer<String> producer1 = pulsarClient.newProducer(Schema.STRING)
.topic("persistent://tenant-1/name/topic-8")
.enableBatching(false).create();
producer1.send("=======from topic persistent://tenant-1/name/topic-8 ");
while (true) {
Message<byte[]> message = allTopicsConsumer.receive();
System.out.println("Received message from topic " + message.getTopicName()
+ ": " + new String(message.getValue()));
allTopicsConsumer.acknowledge(message);
}
}
}
response: only getting the response from persistent topic but not from non-persistent
Interesting.
My observation is that after we create a pattern consumer, for an existing non-persistent topic it doesn't "immediately" create the underlying subscription and consumers if there are no connected producers at this moment. But it will eventually be created after a short time.
TIP: you can display the list of the underlying consumers by casting your consumer to PatternMultiTopicsConsumerImpl
and calling the .getConsumers()
method.
List<ConsumerImpl<byte[]>> consumers =
((PatternMultiTopicsConsumerImpl<byte[]>) allTopicsConsumer).getConsumers();
for (ConsumerImpl<byte[]> consumer : consumers) {
System.out.println("consumer: " + consumer.getTopic());
}
If you modify your code to send a lot of messages asynchronously, you'll start to receive them after a short time.
for (int i = 0; i < 100000; i++) {
producer.sendAsync("=========from topic non-persistent://tenant-1/name/topic-1 " + i);
}
...
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10732
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10733
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10734
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10735
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10736
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10737
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10738
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10739
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10740
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10741
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10742
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10743
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10744
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10745
Received message from topic non-persistent://tenant-1/name/topic-1: =========from topic non-persistent://tenant-1/name/topic-1 10746
I don't know if it can qualify as a bug. cc @lhotari
@ragaur-tibco do you have a real use-case in mind? I wouldn't rely on non-persistent topics if losing a non-significant amount of messages could affect my application.
Here is the reason. Before adding a topic to the topics list, it checks that the topic isActive()
, which checks for !subscriptions.isEmpty() || hasLocalProducers();
.
https://github.com/apache/pulsar/blob/21647a1fc69ff46e65b6eaa37dd6d435e9f8eaef/pulsar-broker/src/main/java/org/apache/pulsar/broker/namespace/NamespaceService.java#L1519
https://github.com/apache/pulsar/blob/21647a1fc69ff46e65b6eaa37dd6d435e9f8eaef/pulsar-broker/src/main/java/org/apache/pulsar/broker/service/nonpersistent/NonPersistentTopic.java#L993