mqtt_cpp icon indicating copy to clipboard operation
mqtt_cpp copied to clipboard

Protobuf over mqtt, terminates at zero byte

Open pPatrickK opened this issue 5 years ago • 7 comments

Hi guys,

we face the following problem with your client:

  1. We send protobuf encoded data over mqtt
  2. It terminates when it hits a zero byte from the buffer.

Any ideas?

It worked with a python client.

Best regards, Patrick

pPatrickK avatar Aug 26 '19 15:08 pPatrickK

See http://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Toc514345564

Statement [MQTT-1.5.4-2]

A UTF-8 Encoded String MUST NOT include an encoding of the null character U+0000.

jonesmz avatar Aug 26 '19 16:08 jonesmz

What part of MQTT message is encoded by protobuf?

part example description
string connect client id, publish topic name must be UTF-8
binary publish payload no problem

Even if you use string, if you don't set the compile option -DMQTT_USE_STR_CHECK, then non UTF-8 string is accepted.

Anyway, could you post the minimal and complete code that reproduces the problem?

redboltz avatar Aug 26 '19 21:08 redboltz

@pPatrickK

Was your issue resolved?

jonesmz avatar Sep 16 '19 16:09 jonesmz

What part of MQTT message is encoded by protobuf?

@redboltz Maybe this is simply a misunderstanding of the API. I'm having trouble myself to understand how I properly publish binary data via the API, e.g. the contents of a struct, without encoding it in a string. If this is in the example code somewhere, I haven't found it, yet.

swltr avatar Oct 15 '19 14:10 swltr

you publish data like this

MQTT_NS::buffer topicName = // Create a topic name however you want
MQTT_NS::buffer message   = // Create a message however you want.
MQTT_NS::any lifekeeper   = std::make_pair(topicName, message);
pAsyncClient->async_publish(std::move(topic_name),
                            std::move(message),
                            std::move(life_keeper),
                            qos_value, // at_most_once, at_least_once, exactly_once
                            retain, // true / false
                            MQTT_NS::v5::properties{},
                            callbackFunction);

mqtt_cpp works natively with boost::asio::const_buffer to send outgoing messages. const_buffer does not understand the concept of lifetime management (e.g., it's just like a std::string_view).

MQTT_NS::buffer is a higher level concept that can be used to make lifetime management easier on you. MQTT_NS::buffer is basically std::string_view + std::shared_ptr<char[]>, so the lifetime of the string can be preserved properly.

mqtt_cpp does not support sending arbitrary structs, or any kind of compile-time / runtime polymorphism where you can send some object that has a serialize() and/or deserialize() member function. The reason for this is that, at the lowest level, any serialize() and/or deserialize() would ultimately turn into allocating something that looks just like std::string_view + std::shared_ptr<char[]>

Ultimately this is because boost::asio, (please inform me if I am mistaken) doesn't understand iterators when sending / receiving data. So you can't serialize your structure on the fly. It has to be done ahead of time.

This means that your choice of (non-deprecated) API when using mqtt_cpp boil down to either using the boost::asio native const_buffer objects, and managing the lifetime of them yourself, or using MQTT_NS::buffer objects, and allowing the library to automatically manage the lifetime for you.

jonesmz avatar Oct 15 '19 18:10 jonesmz

@jonesmz Thank you for writing that down! In the current case, I eventually resorted to encoding the content as a JSON string, but I think it would be great to have that explanation somewhere in the examples.

swltr avatar Oct 20 '19 13:10 swltr

For your information:

Here is an example that encoding MessagePack (msgpack).

https://github.com/msgpack/msgpack-c

Payload can be encoded as binary.

// Copyright Takatoshi Kondo 2019
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)

// no_tls client and server

#include <iostream>
#include <iomanip>
#include <map>
#include <sstream>

#include <mqtt_client_cpp.hpp>

#include <boost/lexical_cast.hpp>

#include <msgpack.hpp>

struct data {
    int i;
    double d;
    std::string s;
    MSGPACK_DEFINE(i, d, s);
};

template <typename Client, typename Disconnect>
void client_proc(
    Client& c,
    std::uint16_t& pid_sub1,
    Disconnect const& disconnect) {

    using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
    // Setup client
    c->set_client_id("cid1");
    c->set_clean_session(true);

    // Setup handlers
    c->set_connack_handler(
        [&c, &pid_sub1]
        (bool sp, MQTT_NS::connect_return_code connack_return_code){
            std::cout << "[client] Connack handler called" << std::endl;
            std::cout << "[client] Clean Session: " << std::boolalpha << sp << std::endl;
            std::cout << "[client] Connack Return Code: "
                      << MQTT_NS::connect_return_code_to_str(connack_return_code) << std::endl;
            if (connack_return_code == MQTT_NS::connect_return_code::accepted) {
                pid_sub1 = c->subscribe("mqtt_client_cpp/topic1", MQTT_NS::qos::at_most_once);
            }
            return true;
        });
    c->set_close_handler(
        []
        (){
            std::cout << "[client] closed." << std::endl;
        });
    c->set_error_handler(
        []
        (boost::system::error_code const& ec){
            std::cout << "[client] error: " << ec.message() << std::endl;
        });
    c->set_suback_handler(
        [&]
        (packet_id_t packet_id, std::vector<MQTT_NS::suback_return_code> results){
            using namespace MQTT_NS::literals;
            std::cout << "[client] suback received. packet_id: " << packet_id << std::endl;
            for (auto const& e : results) {
                std::cout << "[client] subscribe result: " << e << std::endl;
            }
            std::stringstream ss;
            // pack data to msgpack
            msgpack::pack(ss, data { 42, 12.3, "ABC" } );
            auto const& str = ss.str();
            c->publish("mqtt_client_cpp/topic1"_mb, MQTT_NS::buffer(MQTT_NS::string_view(str.data(), str.size())), MQTT_NS::qos::at_most_once);
            return true;
        });
    c->set_publish_handler(
        [&]
        (bool dup,
         MQTT_NS::qos qos_value,
         bool retain,
         MQTT_NS::optional<packet_id_t> packet_id,
         MQTT_NS::buffer topic_name,
         MQTT_NS::buffer contents){
            std::cout << "[client] publish received. "
                      << "dup: " << std::boolalpha << dup
                      << " qos: " << qos_value
                      << " retain: " << std::boolalpha << retain << std::endl;
            if (packet_id)
                std::cout << "[client] packet_id: " << *packet_id << std::endl;
            std::cout << "[client] topic_name: " << topic_name << std::endl;
            auto oh = msgpack::unpack(contents.data(), contents.size());
            std::cout << "[client] contents: " << oh.get() << std::endl;
            auto recv_data = oh->as<data>();
            std::cout << recv_data.i << std::endl;
            std::cout << recv_data.d << std::endl;
            std::cout << recv_data.s << std::endl;
            disconnect();
            return true;
        });

    // Connect
    c->connect();
}

#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <mqtt_server_cpp.hpp>

namespace mi = boost::multi_index;

using con_t = MQTT_NS::server<>::endpoint_t;
using con_sp_t = std::shared_ptr<con_t>;

struct sub_con {
    sub_con(MQTT_NS::buffer topic, con_sp_t con, MQTT_NS::qos qos_value)
        :topic(std::move(topic)), con(std::move(con)), qos_value(qos_value) {}
    MQTT_NS::buffer topic;
    con_sp_t con;
    MQTT_NS::qos qos_value;
};

struct tag_topic {};
struct tag_con {};

using mi_sub_con = mi::multi_index_container<
    sub_con,
    mi::indexed_by<
        mi::ordered_non_unique<
            mi::tag<tag_topic>,
            BOOST_MULTI_INDEX_MEMBER(sub_con, MQTT_NS::buffer, topic)
        >,
        mi::ordered_non_unique<
            mi::tag<tag_con>,
            BOOST_MULTI_INDEX_MEMBER(sub_con, con_sp_t, con)
        >
    >
>;


inline void close_proc(std::set<con_sp_t>& cons, mi_sub_con& subs, con_sp_t const& con) {
    cons.erase(con);

    auto& idx = subs.get<tag_con>();
    auto r = idx.equal_range(con);
    idx.erase(r.first, r.second);
}

template <typename Server>
void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
    s.set_error_handler(
        [](boost::system::error_code const& ec) {
            std::cout << "[server] error: " << ec.message() << std::endl;
        }
    );
    s.set_accept_handler(
        [&s, &connections, &subs](con_sp_t spep) {
            auto& ep = *spep;
            std::weak_ptr<con_t> wp(spep);

            using packet_id_t = typename std::remove_reference_t<decltype(ep)>::packet_id_t;
            std::cout << "[server] accept" << std::endl;
            // For server close if ep is closed.
            auto g = MQTT_NS::shared_scope_guard(
                [&s] {
                    std::cout << "[server] session end" << std::endl;
                    s.close();
                }
            );

            // Pass spep to keep lifetime.
            // It makes sure wp.lock() never return nullptr in the handlers below
            // including close_handler and error_handler.
            ep.start_session(std::make_tuple(std::move(spep), std::move(g)));

            // set connection (lower than MQTT) level handlers
            ep.set_close_handler(
                [&connections, &subs, wp]
                (){
                    std::cout << "[server] closed." << std::endl;
                    auto sp = wp.lock();
                    BOOST_ASSERT(sp);
                    close_proc(connections, subs, sp);
                });
            ep.set_error_handler(
                [&connections, &subs, wp]
                (boost::system::error_code const& ec){
                    std::cout << "[server] error: " << ec.message() << std::endl;
                    auto sp = wp.lock();
                    BOOST_ASSERT(sp);
                    close_proc(connections, subs, sp);
                });

            // set MQTT level handlers
            ep.set_connect_handler(
                [&connections, wp]
                (MQTT_NS::buffer client_id,
                 MQTT_NS::optional<MQTT_NS::buffer> username,
                 MQTT_NS::optional<MQTT_NS::buffer> password,
                 MQTT_NS::optional<MQTT_NS::will>,
                 bool clean_session,
                 std::uint16_t keep_alive) {
                    using namespace MQTT_NS::literals;
                    std::cout << "[server] client_id    : " << client_id << std::endl;
                    std::cout << "[server] username     : " << (username ? username.value() : "none"_mb) << std::endl;
                    std::cout << "[server] password     : " << (password ? password.value() : "none"_mb) << std::endl;
                    std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl;
                    std::cout << "[server] keep_alive   : " << keep_alive << std::endl;
                    auto sp = wp.lock();
                    BOOST_ASSERT(sp);
                    connections.insert(sp);
                    sp->connack(false, MQTT_NS::connect_return_code::accepted);
                    return true;
                }
            );
            ep.set_disconnect_handler(
                [&connections, &subs, wp]
                (){
                    std::cout << "[server] disconnect received." << std::endl;
                    auto sp = wp.lock();
                    BOOST_ASSERT(sp);
                    close_proc(connections, subs, sp);
                });
            ep.set_publish_handler(
                [&subs]
                (bool is_dup,
                 MQTT_NS::qos qos_value,
                 bool is_retain,
                 MQTT_NS::optional<packet_id_t> packet_id,
                 MQTT_NS::buffer topic_name,
                 MQTT_NS::buffer contents){
                    std::cout << "[server] publish received."
                              << " dup: " << std::boolalpha << is_dup
                              << " qos: " << qos_value
                              << " retain: " << std::boolalpha << is_retain << std::endl;
                    if (packet_id)
                        std::cout << "[server] packet_id: " << *packet_id << std::endl;
                    std::cout << "[server] topic_name: " << topic_name << std::endl;
                    auto oh = msgpack::unpack(contents.data(), contents.size());
                    std::cout << "[server] contents: " << oh.get() << std::endl;
                    auto recv_data = oh->as<data>();
                    std::cout << recv_data.i << std::endl;
                    std::cout << recv_data.d << std::endl;
                    std::cout << recv_data.s << std::endl;

                    auto const& idx = subs.get<tag_topic>();
                    auto r = idx.equal_range(topic_name);
                    for (; r.first != r.second; ++r.first) {
                        r.first->con->publish(
                            boost::asio::buffer(topic_name),
                            boost::asio::buffer(contents),
                            std::make_pair(topic_name, contents),
                            std::min(r.first->qos_value, qos_value),
                            is_retain
                        );
                    }
                    return true;
                });
            ep.set_subscribe_handler(
                [&subs, wp]
                (packet_id_t packet_id,
                 std::vector<std::tuple<MQTT_NS::buffer, MQTT_NS::subscribe_options>> entries) {
                    std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl;
                    std::vector<MQTT_NS::suback_return_code> res;
                    res.reserve(entries.size());
                    auto sp = wp.lock();
                    BOOST_ASSERT(sp);
                    for (auto const& e : entries) {
                        MQTT_NS::buffer topic = std::get<0>(e);
                        MQTT_NS::qos qos_value = std::get<1>(e).get_qos();
                        std::cout << "[server] topic: " << topic  << " qos: " << qos_value << std::endl;
                        res.emplace_back(MQTT_NS::qos_to_suback_return_code(qos_value));
                        subs.emplace(std::move(topic), sp, qos_value);
                    }
                    sp->suback(packet_id, res);
                    return true;
                }
            );
            ep.set_unsubscribe_handler(
                [&subs, wp]
                (packet_id_t packet_id,
                 std::vector<MQTT_NS::buffer> topics) {
                    std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl;
                    for (auto const& topic : topics) {
                        subs.erase(topic);
                    }
                    auto sp = wp.lock();
                    BOOST_ASSERT(sp);
                    sp->unsuback(packet_id);
                    return true;
                }
            );
        }
    );

    s.listen();
}

int main(int argc, char** argv) {
    if (argc != 2) {
        std::cout << argv[0] << " port" << std::endl;
        return -1;
    }

    boost::asio::io_context ioc;
    std::uint16_t port = boost::lexical_cast<std::uint16_t>(argv[1]);

    // server
    auto s = MQTT_NS::server<>(
        boost::asio::ip::tcp::endpoint(
            boost::asio::ip::tcp::v4(),
            boost::lexical_cast<std::uint16_t>(argv[1])
        ),
        ioc
    );
    std::set<con_sp_t> connections;
    mi_sub_con subs;
    server_proc(s, connections, subs);


    // client
    std::uint16_t pid_sub1;

    auto c = MQTT_NS::make_sync_client(ioc, "localhost", port);

    int count = 0;
    auto disconnect = [&] {
        if (++count == 1) c->disconnect();
    };
    client_proc(c, pid_sub1, disconnect);

    ioc.run();
}

redboltz avatar Oct 20 '19 14:10 redboltz