rumqtt icon indicating copy to clipboard operation
rumqtt copied to clipboard

How to achieve a reconnect delay in the client?

Open jrmclaurin opened this issue 1 year ago • 6 comments

The client currently appears to have no delay when attempting to reconnect to a broker. This can be especially problematic when the broker is running locally, since the failure happens very quickly, so you get a very fast rate of reconnect attempts.

The eventLoop poll() method warns "Don't block this while iterating." Does that mean it's always harmful to wait before re-polling, even immediately after a disconnect event is returned?

Would it be possible for the client to implement at least a reconnect delay, if not an exponential backoff, that would work even in cases where eventLoop.poll() is always called with no delay?

jrmclaurin avatar Oct 02 '24 20:10 jrmclaurin

This is even more problematic for production systems where the connection to the broker is over a low-latency LAN. You can flood the network with connection requests as the client requests a connection but the server immediately rejects them. It the case where the destination host is unreachable or does not respond, the OS itself implements a retry w/ back-off on the initial TCP SYN, but in the case of an active rejection, it's always required that user code implement a retry delay for client protocols.

If a PR would be considered to implement this behavior in the library, we'd consider implementing it. Just want to have buy-in from the maintainers that this is a valid use case to handle in the library itself. The way MqttOptions is implemented as a builder is nice because we could a parameter for reconnect delay without breaking semver.

jadamcrain avatar Oct 05 '24 17:10 jadamcrain

Does that mean it's always harmful to wait before re-polling, even immediately after a disconnect event is returned?

That wasn't the intention of the comment. We merely wanted to convey that while connected the poll method must be called consistently to ensure the system progresses and doesn't get disconnected due to breach of keepalive timeout. Here is how we do this.

Would it be possible for the client to implement at least a reconnect delay, if not an exponential backoff, that would work even in cases where eventLoop.poll() is always called with no delay?

If a PR would be considered to implement this behavior in the library, we'd consider implementing it.

Thanks for the interest, but we feel it is a better practice to implement this outside of the library and keep it clean with regards to the MQTT standards. We would love to see examples though, and are willing to accept documentation improvements if that'd be something you could help us with!

de-sh avatar Oct 05 '24 18:10 de-sh

@de-sh Can you point me where in the MQTT standard it says the client must or must not implement a reconnect delay? This is functionality that every user developing a real-world system needs to implement MQTT robustly. An option could be defaulted to 0 (no delay) if you feel that's the default best behavior.

jadamcrain avatar Oct 05 '24 18:10 jadamcrain

Can you point me where in the MQTT standard it says the client must or must not implement a reconnect delay?

There might be a misunderstanding here, I was also point out the same in my message. We should implement this outside of the library, but an example would be a starting point for anyone interested to get started.

de-sh avatar Oct 05 '24 18:10 de-sh

Thanks for clarifying @de-sh, I understand what you're saying now. We'll produce an example of what it takes to implement this outside the library currently, an we can continue the discussion from there as to whether it belongs inside the library or not.

jadamcrain avatar Oct 05 '24 19:10 jadamcrain

Here is how I implemented exponential backoff in my project using the backoff library:

pub async fn async_event_loop_listener(
    (eventloop, incoming_event_sender): (EventLoop, Sender<Message>),
) -> Result<(), error::Error> {
    let eventloop = Arc::new(Mutex::new(eventloop));

    loop {
        match poll(Arc::clone(&eventloop)).await {
            Ok(event) => {
                if let Err(err) = handle_event(event, &incoming_event_sender) {
                    error!("Error while handling MQTT event: {err}");
                }
            }
            Err(err) => {
                error!("Fatal MQTT connection error: {err}");
                return Err(err.into());
            }
        };
    }
}

async fn poll(eventloop: Arc<Mutex<EventLoop>>) -> Result<Event, ConnectionError> {
    retry_notify(
        ExponentialBackoff::default(),
        || async { eventloop.lock().await.poll().await.map_err(backoff_error) },
        |err, dur: Duration| {
            let dur = dur.as_secs_f32();
            error!("Error while polling MQTT event loop: {err}\n -> Retrying in {dur:.1}s...");
        },
    )
    .await
}

fn backoff_error(err: ConnectionError) -> backoff::Error<ConnectionError> {
    match err {
        ConnectionError::ConnectionRefused(
            ConnectReturnCode::ProtocolError
            | ConnectReturnCode::UnsupportedProtocolVersion
            | ConnectReturnCode::ClientIdentifierNotValid
            | ConnectReturnCode::BadUserNamePassword
            | ConnectReturnCode::NotAuthorized
            | ConnectReturnCode::Banned
            | ConnectReturnCode::BadAuthenticationMethod
            | ConnectReturnCode::UseAnotherServer
            | ConnectReturnCode::ServerMoved,
        ) => backoff::Error::permanent(err),
        _ => backoff::Error::transient(err),
    }
}

ghost avatar Nov 27 '24 18:11 ghost