hivemq-mqtt-client
hivemq-mqtt-client copied to clipboard
subscribeWithPuplishes.subscribeSIngleFuture interferes when further processing publishes
When calling subscribeSingleFuture() on a FlowableWithSingle<MqttXPublish, MqttXSubAck> i would expect that the original flowable behaves the same as if I would process it with the subscribeSingleFuture() call. If I run the subscribeSingleFuture() the flow should still process all MqttPublishes received by the subscription.
We publish >20 message to the client. When running subscribeSingleFuture() on the FlowableWithSingle and manually acking the publishes on the original flowable, the original flowable only emits 20 elements, capped by the max in-flight messages of mosquitto.
To Reproduce
In our scenario we want to evaluate the SubAck prior to processing and consuming the publishes.
- Publish n messages with QoS > 0 to a mosquitto instance with max in-flight configured to < n
- Consume the messages by an RXClient with
subscribePublishes(Mqtt5Subscribe, true) - Store the returned
FlowableWithSinglein a variable - Run
subscribeSingleFuture()on it - Acknowledge the messages in a mapping stage on the stored
FlowableWithSingle - Consume the stored
FlowableWithSingle
`final var flowableWithSingle = client.subscribePublishes(Mqtt5Subscribe.builder().topicFilter(TOPIC).qos(MqttQos.EXACTLY_ONCE).build(), true);
flowableWithSingle.subscribeSingleFuture();
flowableWithSingle.doOnNext(n -> {
LOGGER.info("Got next");
n.acknowledge();
}).ignoreElements().blockingAwait();`
Without calling the subscribeSingleFuture() the messages are acked correctly and every message is received.
Details
- Affected HiveMQ MQTT Client version(s): 1.3.0
- Used JVM version: Eclipse Temurin 17.0.3
- Used OS (name and version): macOs Monterey 12.5.1 (21G83)
- Used MQTT version: 3 and 5
- Used MQTT broker (name and version): eclipse-mosquitto:1.6.15
@DerSchwilk and I encountered that behaviour while including the reactive API of HiveMQ client in Eclipse Ditto.
We run our system-tests with Eclipse Mosquito, that's where we noticed the strange behaviour change when using FlowableWithSingle.