Support for consumer event listener
We're interested in adding support for this feature that already exists in the Java client. I have quickly looked at the code and most of the logic to handle active/inactive events is already in place. What remains to be implemented is surfacing the event to the user.
First, I have one question:
Is there any guarantee that when a consumer becomes active for a topic, it always receives a CommandActiveConsumerChange before the first message?
I see a few options regarding the public API:
- Add a new method at the consumer level that returns a stream of events. The user is in charge of polling both futures.
tokio::select! {
biased;
event = consumer.events.next() => {
}
message = consumer.next() => {
}
}
- Add a new method at the consumer level that returns a stream of events where the event object is an enum:
pub enum PulsarEvent {
BecameActive(...),
BecameInactive(...),
Message(...)
}
while let Some(event) = consumer.events.try_next().await?
The latter is, IMHO, easier to consume for the end user.
Regarding the implementation strategy, we can reuse the main message channel with an enum, or handle events in a separate channel.
Thoughts?
Is there any guarantee that when a consumer becomes active for a topic, it always receives a CommandActiveConsumerChange before the first message?
Yes, you can see the following code in PersistentDispatcherSingleActiveConsumer:
notifyActiveConsumerChanged(activeConsumer);
readMoreEntries(activeConsumer);
Only after the ACTIVE_CONSUMER_CHANGE command was sent to the consumer would the dispatcher read entries.
The latter is, IMHO, easier to consume for the end user.
I prefer the latter one as well.. The first one is more Java-style, which is not easy to use IMO.
I am agree with @BewareMyPower, the latter is better. To me, this syntax is more idiomatic than the first one.
Regarding the implementation strategy, we can reuse the main message channel with an enum, or handle events in a separate channel.
I think it is better to use the main channel with an enum.