pulsar-rs icon indicating copy to clipboard operation
pulsar-rs copied to clipboard

Support for consumer event listener

Open guilload opened this issue 2 years ago • 2 comments

We're interested in adding support for this feature that already exists in the Java client. I have quickly looked at the code and most of the logic to handle active/inactive events is already in place. What remains to be implemented is surfacing the event to the user.

First, I have one question: Is there any guarantee that when a consumer becomes active for a topic, it always receives a CommandActiveConsumerChange before the first message?

I see a few options regarding the public API:

  1. Add a new method at the consumer level that returns a stream of events. The user is in charge of polling both futures.
tokio::select! {
  biased;
  event = consumer.events.next() => {
  }
  
  message = consumer.next() => {
  }
}
  1. Add a new method at the consumer level that returns a stream of events where the event object is an enum:
pub enum PulsarEvent {
  BecameActive(...),
  BecameInactive(...),
  Message(...)
}

 while let Some(event) = consumer.events.try_next().await?

The latter is, IMHO, easier to consume for the end user.

Regarding the implementation strategy, we can reuse the main message channel with an enum, or handle events in a separate channel.

Thoughts?

guilload avatar Feb 12 '23 16:02 guilload

Is there any guarantee that when a consumer becomes active for a topic, it always receives a CommandActiveConsumerChange before the first message?

Yes, you can see the following code in PersistentDispatcherSingleActiveConsumer:

            notifyActiveConsumerChanged(activeConsumer);
            readMoreEntries(activeConsumer);

Only after the ACTIVE_CONSUMER_CHANGE command was sent to the consumer would the dispatcher read entries.

The latter is, IMHO, easier to consume for the end user.

I prefer the latter one as well.. The first one is more Java-style, which is not easy to use IMO.

BewareMyPower avatar Feb 13 '23 06:02 BewareMyPower

I am agree with @BewareMyPower, the latter is better. To me, this syntax is more idiomatic than the first one.

Regarding the implementation strategy, we can reuse the main message channel with an enum, or handle events in a separate channel.

I think it is better to use the main channel with an enum.

FlorentinDUBOIS avatar Feb 14 '23 11:02 FlorentinDUBOIS