pulsar-jms icon indicating copy to clipboard operation
pulsar-jms copied to clipboard

When using the Request-Reply Pattern, Topics with the prefix “jms-tem-” are not successfully deleted after use.

Open kaifahm opened this issue 10 months ago • 2 comments

https://github.com/datastax/pulsar-jms/blob/dacf5a8977effdb67c9b438ac6b11bdb3dd2d1ea/pulsar-jms/src/main/java/com/datastax/oss/pulsar/jms/PulsarMessageProducer.java#L354

kaifahm avatar Apr 23 '24 14:04 kaifahm

After investigation, it was found that the producers associated with these temporary topics were not correctly closed, resulting in their failure to be successfully deleted. While tracing the code in com.datastax.oss.pulsar.jms.PulsarMessageProducer#close, it was observed that there is no logic to close the producer. The reason behind this is that the PulsarConnectionFactory caches the producer, which prevents it from being closed here. However, considering that these temporary topics are likely to be used only once, there should be no need to cache their producers.

kaifahm avatar Apr 25 '24 03:04 kaifahm

I'm doing this for now, using com.datastax.oss.pulsar.jms.PulsarMessageProducer#close to determine if it's a temporary topic, and if so, closing the producer and removing it from the buffer. Would you like the author to help see if this is possible?😊

PulsarMessageProducer.java

  public void close() throws JMSException {
    Utils.checkNotOnMessageProducer(session, this);

    if (closed) {
      return;
    }

    PulsarDestination destination = (PulsarDestination) getDestination();
    if (destination.getName().contains("jms-temp-")) {
      Producer<?> producer = session.getProducerForDestination(destination);
      if (producer == null) {
        return;
      }
      try {
        producer.close();
        session.removeProducer(this);
        closed = true;
      } catch (Exception e) {
        throw Utils.handleException(e);
      }
    } else {
      closed = true;
    }
  }

PulsarSession.java

  public void removeProducerForDestination(Destination destination) throws JMSException {
    PulsarDestination pulsarDestination = (PulsarDestination) destination;
    String key = pulsarDestination.getName();
    getFactory().removeProducer(key);
  }

PulsarConnectionFactory.java

  public void removeProducer(String key) {
    producers.remove(key);
  }

kaifahm avatar Apr 25 '24 03:04 kaifahm