mop icon indicating copy to clipboard operation
mop copied to clipboard

MoP Proxy, keep alive is not managed correctly causes connection to fail

Open tsturzl opened this issue 2 years ago • 13 comments

Describe the bug

MQTT keep alive is initiated by the client, however per the MQTT spec keep alive only sends a PINGREQ control message if the keep alive time has been reached without any other MQTT message being sent and acknowledged by the broker. This means that if you are constantly interacting with a say topic-1 which is served by one broker, and you have inactive subscriptions on another broker, the client won't send PINGREQ, and therefore only the broker serving topic-1 is actually active, and the channel to the broker handling inactive subscriptions is not receiving any traffic causing the either IdleStateHandler (which is set to 30sec) to fail or keep alive interval to be exceeded. This ultimately causes the connection to the proxy to fail routinely, creating very instable connections. Also with the PINGREQ forwarding strategy used, you are getting a PINGREQ forwarded per topic you are subscribed to or previously published to, not per broker you are being forward to, meaning if you have 30 subscriptions and only 3 brokers, you end up getting 30 PINGRESP for every PINGREQ instead of just 3.

Ultimately the MoP Proxy is very broken for standard use cases. If the client is interacting with more than 1 topic, and either of those topics aren't seeing regular activity, the client connection can just fail suddenly. This ping forwarding strategy is ultimately flawed by the fact that PINGREQs aren't sent by clients who are actively interacting back and forth with the broker. I expect simply use cases where devices that connect only really interact with one topic are not impacted by this bug, but anything that might be subscribing on one topic and publishing on another is very likely to be impacted, which is an incredibly common use case.

To Reproduce Steps to reproduce the behavior:

  1. Setup a cluster of 3 brokers with MoP proxy enabled.
  2. Connect an MQTT client with a 5 second keep alive.
  3. Subscribe 3 topics where each topic is handled by a different broker.
  4. Publish to one of those topics every second.
  5. Observe the connection fail keep alive is exceeded.
  6. MQTT socket will drop, client receives no disconnect message.

Expected behavior Each PINGREQ receives one response under normal conditions, technically this doesn't break the client but it creates unnecessary traffic for a protocol designed to handle devices on low bandwidth networks. Inactivity on subscribed topics, and thus inactivity between proxy and broker, should not cause the connection between client and proxy to fail, nor should it cause the connection between proxy and broker to fail.

tsturzl avatar Jan 15 '24 19:01 tsturzl

PR submitted for fix. We are still doing more testing on this, but it's greatly improved connection reliability for us so far.

tsturzl avatar Jan 16 '24 01:01 tsturzl

I'm abandoning this effort. This patch has improved connection stability, and keep alive semantics between the client and the proxy. Previously I'd get a PINGRESP for every topic the client interacted with, this does not do that. I'm unsure how the MoP proxy has ever worked outside of maybe the most basic use cases.

Other issues I've noticed. AdapterChannel attempts reconnect, this reconnect NEVER sends an MQTT connect message. This means the reconnect fails, this failure causes a stack overflow because of unbound recursive retry. Even if you send a MQTT connection message this makes session state mismatch between brokers, because if you have cleanSession set to true, you end up with a cleanSession after reconnect with that broker but not other brokers. Really the only logical thing to do here is fail the entire proxy connection if any of the adapterChannels fail. You could try to build a more robust session state manager which you can somehow coordinate between brokers.

So far connections fail and seem to have trouble reconnecting with or without this change. With this change the connections seem to last several hours, but without the change and a keep alive of 3 seconds and constant stream of publishes, the client would fail in a matter of seconds. The current keep alive mechanism does not work, nor does it meet spec.

I honestly do not know how MoP proxy has ever worked in recent iterations. I'm fairly certain this isn't a matter of configuration after spending nearly a month troubleshooting several issues related to the proxy. The proxy is inherently complicated, and doesn't really seem to pose any tangible benefits. We are deciding to roll out own solution at this point as we believe the architecture of MoP is too complicated, redundant, and does not comply with MQTT spec.

tsturzl avatar Jan 26 '24 23:01 tsturzl

This is a pretty major bug. It'd be nice to get a response to this given the MoP proxy, and therefore clustered Pulsar running MoP, are completely non-functional based on our evaluation.

tsturzl avatar Feb 06 '24 21:02 tsturzl

I have tried to reproduce following the steps you described, but the client doesn't receive disconnect . Please add the reproduced test for this issue.

Technoboy- avatar Feb 09 '24 14:02 Technoboy-

It's very difficult to add tests, because this failure happens as a result of clustered pulsar, and the adapter channels that connect the MoP proxy to the various brokers. You need at least 2 brokers to reproduce the disconnection issue. In order to reproduce in tests I'd either have to make a lot of test doubles, where it would be incredibly hard to reproduce the issue with confidence that it isn't just an issue in the test doubles, or create a very complicated integration test suite from the ground up. I can make a repository that can run through minikube or kind with a MQTT client that reproduces the issue. I mean, you can also audit the source code related to this and quickly conclude that it does not conform to the MQTT spec, or packet sniff a client connection to a MoP proxy connection and observe that you get multiple PINGRESP to each PINGREQ, this logic is made exceedingly clear in the source code also. All PINGRESP that come back from the broker over adapterChannels are simply forwarded back to the client, and all PINGREQ from the client are clearly sent to each topicBroker in the the topicBroker map which often means that a single broker will be forwarded a PINGREQ more than once. I've watched it in a debugger, and I've seen on a packet sniffer. We've witnessed this issue using paho MQTT java and python clients.

Also in this case the client will not receive an explicit disconnect because the broker believes the client has already disconnected, as keep alive has been exceeded. Also there is a stack overflow here because of unbounded recursion. Also in the adapter channel reconnect mechanism, a connect message is never sent, so reconnect on an adapter channel never actually works.

tsturzl avatar Feb 09 '24 17:02 tsturzl

Here are full steps to reproduce including a minikube setup with an example Java program. I recommend you observe everything in wireshark to see why it fails. https://github.com/TTCColorado/mop-proxy-bug-reproduction

I don't believe I'm misconfiguring anything, but feel free to check that and make sure things are in order. I recommend you observe the client's packets in wireshark with the provided command (in the README) to understand why things are failing. I use Paho because it is the most common Java MQTT client, and it's the one we are using where we've seen this failure occur.

I'm able to reliably reproduce this issue with this setup. I can also see all the behavior I've outlined above when observing this example.

Furthermore, I've work on other MQTT brokers I'm very familiar with the speicifcation, but also the project we are running on MoP has used 3 different MQTT implementations Mosquito, VerneMQ, and RabbitMQ's MQTT plugin. We've not had an issue running these same projects on any of these respective platforms. In fact, we do not see these same issues when running MoP in standalone without the MoP Proxy. However when running Pulsar in a cluster you are required to use MoP proxy. In the case where we run MoP Proxy on cluster Pulsar we see connections failing very frequently, within a couple seconds of startup. The provided link reproduces this issue reliably, and confirms the issue not only exists, but behaves in the way I have documented in great detail.

tsturzl avatar Feb 09 '24 23:02 tsturzl

Here are full steps to reproduce including a minikube setup with an example Java program. I recommend you observe everything in wireshark to see why it fails. https://github.com/TTCColorado/mop-proxy-bug-reproduction

I don't believe I'm misconfiguring anything, but feel free to check that and make sure things are in order. I recommend you observe the client's packets in wireshark with the provided command (in the README) to understand why things are failing. I use Paho because it is the most common Java MQTT client, and it's the one we are using where we've seen this failure occur.

I'm able to reliably reproduce this issue with this setup. I can also see all the behavior I've outlined above when observing this example.

Furthermore, I've work on other MQTT brokers I'm very familiar with the speicifcation, but also the project we are running on MoP has used 3 different MQTT implementations Mosquito, VerneMQ, and RabbitMQ's MQTT plugin. We've not had an issue running these same projects on any of these respective platforms. In fact, we do not see these same issues when running MoP in standalone without the MoP Proxy. However when running Pulsar in a cluster you are required to use MoP proxy. In the case where we run MoP Proxy on cluster Pulsar we see connections failing very frequently, within a couple seconds of startup. The provided link reproduces this issue reliably, and confirms the issue not only exists, but behaves in the way I have documented in great detail.

https://github.com/TTCColorado/mop-proxy-bug-reproduction/blob/5f8b78e6c00570a324be4c1da54acc3dc2b7f322/test-client/untitled/src/main/java/org/example/Main.java#L36 why do you connect the mop URL and not the proxy URL?

Technoboy- avatar Feb 18 '24 06:02 Technoboy-

@Technoboy- Per the readme I'm forwarding the port from K8s like so kubectl port-forward pod/pulsar-broker-1 1883:5682. So I'm connecting to the 5682 proxy port, I'm just forwarding it to 1883 on local host. This makes it easier to filter and analyze MQTT packets in Wireshark. So this example is running against the proxy.

tsturzl avatar Feb 18 '24 07:02 tsturzl

Put the test in the package below and run the result.

image


package io.streamnative.pulsar.handlers.mqtt.mqtt5.hivemq.proxy;

import com.hivemq.client.mqtt.MqttGlobalPublishFilter;
import com.hivemq.client.mqtt.datatypes.MqttQos;
import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
import com.hivemq.client.mqtt.mqtt5.message.publish.Mqtt5Publish;
import io.streamnative.pulsar.handlers.mqtt.MQTTCommonConfiguration;
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
import io.streamnative.pulsar.handlers.mqtt.mqtt5.hivemq.base.MQTT5ClientUtils;
import lombok.extern.slf4j.Slf4j;
import org.testng.Assert;
import org.testng.annotations.Test;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Optional;
import java.util.Random;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;

/**
 * Default there will start 3 brokers
 */
@Slf4j
public class ProxyIntegrationTest extends MQTTTestBase {
    private final Random random = new Random();

    @Override
    public MQTTCommonConfiguration initConfig() throws Exception {
        MQTTCommonConfiguration conf = super.initConfig();
        conf.setMqttProxyEnabled(true);
        conf.getProperties().setProperty("systemEventEnabled", "false");
        conf.setSystemTopicEnabled(false);
        return conf;
    }

    @Test
    public void testProxyKeepAlive() throws Exception {
        String topicPrefix = "topic";
        Set<String> brokers = new HashSet<>();
        Set<String> topics = new HashSet<>();
        for (int i = 1; i <= 2000; i ++) {
            String topic = topicPrefix + i;
            final String ownedBroker = admin.lookups().lookupTopic(topic);
            boolean added = brokers.add(ownedBroker);
            if (added) {
                topics.add(topic);
                if (brokers.size() == 3) {
                    break;
                }
            }
        }
        // keep there are 3 topics in different brokers
        Assert.assertEquals(brokers.size(), 3);
        Assert.assertEquals(topics.size(), 3);

        ConcurrentMap<String, List<String>> sentMsg = new ConcurrentHashMap<>();
        ConcurrentMap<String, List<String>> receivedMsgs = new ConcurrentHashMap<>();

        CountDownLatch latch = new CountDownLatch(6);
        // start 3 consumers and 3 producers
        for (String topic : topics) {

            new Thread(new Runnable() {

                @Override
                public void run() {
                    try {
                        Mqtt5BlockingClient consumer = MQTT5ClientUtils.createMqtt5ProxyClient(
                                getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size())));
                        consumer.connectWith().keepAlive(5).send();
                        consumer.subscribeWith()
                                .topicFilter(topic)
                                .qos(MqttQos.AT_LEAST_ONCE)
                                .send();
                        Mqtt5BlockingClient.Mqtt5Publishes publishes = consumer.publishes(MqttGlobalPublishFilter.ALL);
                        while(true) {
                            final Optional<Mqtt5Publish> receive = publishes.receive(20, TimeUnit.SECONDS);
                            if (receive.isEmpty()) {
                                break;
                            }
                            final Mqtt5Publish msg = receive.get();
                            final List<String> datas = receivedMsgs.getOrDefault(topic, new ArrayList<>());
                            datas.add(new String(msg.getPayloadAsBytes()));
                            receivedMsgs.put(topic, datas);
                            log.info("received : {}", msg);
                        }
                        latch.countDown();
                    } catch (Exception e) {
                        log.error("consumer for topic : {} error", topic);
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
            }).start();

            Thread.sleep(2000);

            new Thread(new Runnable() {
                @Override
                public void run() {
                    try {
                        Mqtt5BlockingClient producer = MQTT5ClientUtils.createMqtt5ProxyClient(
                                getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size())));
                        producer.connectWith().keepAlive(5).send();
                        AtomicInteger index = new AtomicInteger(1);
                        while (!Thread.interrupted() && index.get() <= 100) {
                            String msg = (topic + "-" + index.getAndIncrement());
                            producer.publishWith()
                                    .topic(topic)
                                    .qos(MqttQos.AT_LEAST_ONCE)
                                    .payload(msg.getBytes(StandardCharsets.UTF_8))
                                    .send();
                            final List<String> sents = sentMsg.getOrDefault(topic, new ArrayList<>());
                            sents.add(msg);
                            sentMsg.put(topic, sents);
                            Thread.sleep(1000);
                        }
                        latch.countDown();
                    } catch (Exception e) {
                        log.error("producer for topic : {} error", topic);
                        Thread.currentThread().interrupt();
                        throw new RuntimeException(e);
                    }
                }
            }).start();
        }

        latch.await();
        // check the result
        for (String topic : topics) {
            Assert.assertEquals(sentMsg.get(topic), receivedMsgs.get(topic));
        }
        // also you can search the logs if there are `disconnect` command in the console
    }

}

Technoboy- avatar Feb 19 '24 12:02 Technoboy-

@Technoboy- This isn't really reproducing the same case, at least it doesn't seem to when I skim through it. The problem occurs when you have interacted with a number of topics across different brokers, but are only actively communicating on one topic. This means the client will not send keep alive since other control messages are being transmitted in the keep alive period. The failure happens because the other adapter channels become idle for a duration longer than the keep alive. Also it's very obvious in the code that PINGREQs are sent per entry in the topicBroker map, and PINGRESP from the brokers are just sent back to the client. The source code make it very clear that PINGREQs get multiple responses, and my reproduced case shows exactly this behavior. That behavior is not correct.

Also not all client's implement keep alive the same. We use the Java Paho MQTT client, because it's the most popular. I'm not sure if HiveMQ supresses PINGREQ when other control messages are being sent. Depending on the behavior of this specific client you may not be able to reproduce the bug.

I can try to adapt this test case to reproduce the issue. The steps I provided to reproduce still hold true however, and unless you can see any flaw in my test case or setup itself, then the bug still exists.

tsturzl avatar Feb 19 '24 20:02 tsturzl

The problem in your test is that you are constantly keeping communication with all brokers. In my example I am communicating continuously with only one broker after communicating with both. This means the proxy create adapter channels to all brokers, but then I only use one of those adapter channels continuously, meaning that the other adapter channel that is going unused for that duration. Also in your code you are creating a connection per consumer and producer, meaning each client will only communicate with one broker. I urge you to just simply audit the keep alive forwarding mechanism it's forwarding PINGREQ to every topicBroker the proxy session has cached, which means you get as many PINGRESP as you have topics interacted with. There is no possible way this is working correctly, and I've reproduced it several ways now as well as observed the very clear behavior of the source code.

If I move my example into this integration tests it fails. The client disconnects and publishing eventually fails. Don't really even need assertions, because publish will throw an exception when publish is attempted while the client is not connected. In this case the publishes work for a while until they fail, and it fails likely because keep alive duration is exceeded between the proxy and adapter channels. My example isn't doing anything strange or unordinary, and we see this issue in applications we've run against several other MQTT implementations with no issue, including MoP standalone. It's pretty irrefutable to me at this point that there is indeed a systemic bug here.

package io.streamnative.pulsar.handlers.mqtt.mqtt5.hivemq.proxy;

import io.streamnative.pulsar.handlers.mqtt.MQTTCommonConfiguration;
import io.streamnative.pulsar.handlers.mqtt.base.MQTTTestBase;
import lombok.extern.slf4j.Slf4j;
import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.testng.annotations.Test;

import java.util.*;

/**
 * Default there will start 3 brokers
 */
@Slf4j
public class ProxyIntegrationTest extends MQTTTestBase {
    private static class Callback implements MqttCallback
    {

        @Override
        public void connectionLost( java.lang.Throwable throwable )
        {
            throw new RuntimeException("MQTT Client connection died");
        }

        @Override
        public void messageArrived( java.lang.String s, MqttMessage mqttMessage ) throws Exception
        {
            System.out.println( "Message arrived" );
        }

        @Override
        public void deliveryComplete( IMqttDeliveryToken iMqttDeliveryToken )
        {
        }
    }

    private final Random random = new Random();

    @Override
    public MQTTCommonConfiguration initConfig() throws Exception {
        MQTTCommonConfiguration conf = super.initConfig();
        conf.setMqttProxyEnabled(true);
        conf.getProperties().setProperty("systemEventEnabled", "false");
        conf.setSystemTopicEnabled(false);
        return conf;
    }

    @Test
    public void testProxyKeepAlivePaho() throws Exception {
        String topicPrefix = "topic";
        Set<String> brokers = new HashSet<>();
        List<String> topics = new ArrayList<>();
        for (int i = 1; i <= 2000; i ++) {
            String topic = topicPrefix + i;
            final String ownedBroker = admin.lookups().lookupTopic(topic);
            boolean added = brokers.add(ownedBroker);
            if (added) {
                topics.add(topic);
                if (brokers.size() == 3) {
                    break;
                }
            }
        }
        String serverUri = String.format( "tcp://127.0.0.1:%d", getMqttProxyPortList().get(random.nextInt(mqttProxyPortList.size())) );
        MqttAsyncClient client = new MqttAsyncClient(  serverUri, "test", new MemoryPersistence() );
        MqttConnectOptions connectOptions = new MqttConnectOptions();
        connectOptions.setCleanSession( true );
        connectOptions.setKeepAliveInterval(5);
        System.out.println( "connecting..." );
        client.connect( connectOptions ).waitForCompletion();
        System.out.println( "connected" );

        // Another bug, you need to make a connection before the MoP proxy will even respond to pings, because you don't
        // have any adapter channels open to forward PINGREQ messages to.
        client.subscribe( topics.get(0), 1 ).waitForCompletion();

        // sleep the keep alive period to show that PING will happen in abscence of other messages.
        Thread.sleep( 6000 );

        // make more subscriptions to connect to multiple brokers.
        client.subscribe( topics.get(1), 1 ).waitForCompletion();
        client.subscribe( topics.get(2), 1 ).waitForCompletion();


        // publish QoS 1 message to prevent the need for PINGREQ. Keep alive only sends ping in abscence of other
        // messages. Refer to section 3.1.2.10 of the MQTT 3.1.1 specification.
        int i = 0;
        while (i < 20) {
            System.out.println( "Publishing message..." );
            client.publish( topics.get(0), "test".getBytes(), 1, false ).waitForCompletion();
            Thread.sleep( 1000 );
            i++;
        }
        // MoP keep alive is 1.5 * keep alive, so 7.5 seconds in this case. After 7.5 seconds from the last PINGREQ
        // the connection will die.
    }
}

Add this to your pom dependencies:

       <dependency>
            <groupId>org.eclipse.paho</groupId>
            <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
            <version>1.2.5</version>
            <scope>test</scope>
        </dependency>

tsturzl avatar Feb 22 '24 18:02 tsturzl

@Technoboy- any news on this? Are you able to confirm what I've reported?

tsturzl avatar Mar 01 '24 20:03 tsturzl

@Technoboy- I'm fine helping to work towards a solution, but I think the fix for this is going to be very involving, and will likely involve some major changes. I think we need to agree on a solution before anyone can start working towards a solution.

tsturzl avatar Mar 05 '24 18:03 tsturzl