pulsar-jms
pulsar-jms copied to clipboard
When using the Request-Reply Pattern, Topics with the prefix “jms-tem-” are not successfully deleted after use.
https://github.com/datastax/pulsar-jms/blob/dacf5a8977effdb67c9b438ac6b11bdb3dd2d1ea/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java#L354
After investigation, it was found that the producers associated with these temporary topics were not correctly closed, resulting in their failure to be successfully deleted. While tracing the code in com.datastax.oss.pulsar.jms.PulsarMessageProducer#close
, it was observed that there is no logic to close the producer. The reason behind this is that the PulsarConnectionFactory
caches the producer, which prevents it from being closed here. However, considering that these temporary topics are likely to be used only once, there should be no need to cache their producers.
I'm doing this for now, using com.datastax.oss.pulsar.jms.PulsarMessageProducer#close
to determine if it's a temporary topic, and if so, closing the producer and removing it from the buffer. Would you like the author to help see if this is possible?😊
PulsarMessageProducer.java
public void close() throws JMSException {
Utils.checkNotOnMessageProducer(session, this);
if (closed) {
return;
}
PulsarDestination destination = (PulsarDestination) getDestination();
if (destination.getName().contains("jms-temp-")) {
Producer<?> producer = session.getProducerForDestination(destination);
if (producer == null) {
return;
}
try {
producer.close();
session.removeProducer(this);
closed = true;
} catch (Exception e) {
throw Utils.handleException(e);
}
} else {
closed = true;
}
}
PulsarSession.java
public void removeProducerForDestination(Destination destination) throws JMSException {
PulsarDestination pulsarDestination = (PulsarDestination) destination;
String key = pulsarDestination.getName();
getFactory().removeProducer(key);
}
PulsarConnectionFactory.java
public void removeProducer(String key) {
producers.remove(key);
}