mqtt_cpp
mqtt_cpp copied to clipboard
Protobuf over mqtt, terminates at zero byte
Hi guys,
we face the following problem with your client:
- We send protobuf encoded data over mqtt
- It terminates when it hits a zero byte from the buffer.
Any ideas?
It worked with a python client.
Best regards, Patrick
See http://docs.oasis-open.org/mqtt/mqtt/v5.0/cs02/mqtt-v5.0-cs02.html#_Toc514345564
Statement [MQTT-1.5.4-2]
A UTF-8 Encoded String MUST NOT include an encoding of the null character U+0000.
What part of MQTT message is encoded by protobuf?
part | example | description |
---|---|---|
string | connect client id, publish topic name | must be UTF-8 |
binary | publish payload | no problem |
Even if you use string, if you don't set the compile option -DMQTT_USE_STR_CHECK
, then non UTF-8 string is accepted.
Anyway, could you post the minimal and complete code that reproduces the problem?
@pPatrickK
Was your issue resolved?
What part of MQTT message is encoded by protobuf?
@redboltz Maybe this is simply a misunderstanding of the API. I'm having trouble myself to understand how I properly publish binary data via the API, e.g. the contents of a struct
, without encoding it in a string. If this is in the example code somewhere, I haven't found it, yet.
you publish data like this
MQTT_NS::buffer topicName = // Create a topic name however you want
MQTT_NS::buffer message = // Create a message however you want.
MQTT_NS::any lifekeeper = std::make_pair(topicName, message);
pAsyncClient->async_publish(std::move(topic_name),
std::move(message),
std::move(life_keeper),
qos_value, // at_most_once, at_least_once, exactly_once
retain, // true / false
MQTT_NS::v5::properties{},
callbackFunction);
mqtt_cpp works natively with boost::asio::const_buffer to send outgoing messages. const_buffer does not understand the concept of lifetime management (e.g., it's just like a std::string_view).
MQTT_NS::buffer is a higher level concept that can be used to make lifetime management easier on you. MQTT_NS::buffer is basically std::string_view
+ std::shared_ptr<char[]>
, so the lifetime of the string can be preserved properly.
mqtt_cpp does not support sending arbitrary structs, or any kind of compile-time / runtime polymorphism where you can send some object that has a serialize()
and/or deserialize()
member function. The reason for this is that, at the lowest level, any serialize()
and/or deserialize()
would ultimately turn into allocating something that looks just like std::string_view
+ std::shared_ptr<char[]>
Ultimately this is because boost::asio, (please inform me if I am mistaken) doesn't understand iterators when sending / receiving data. So you can't serialize your structure on the fly. It has to be done ahead of time.
This means that your choice of (non-deprecated) API when using mqtt_cpp boil down to either using the boost::asio native const_buffer objects, and managing the lifetime of them yourself, or using MQTT_NS::buffer objects, and allowing the library to automatically manage the lifetime for you.
@jonesmz Thank you for writing that down! In the current case, I eventually resorted to encoding the content as a JSON string, but I think it would be great to have that explanation somewhere in the examples.
For your information:
Here is an example that encoding MessagePack (msgpack).
https://github.com/msgpack/msgpack-c
Payload can be encoded as binary.
// Copyright Takatoshi Kondo 2019
//
// Distributed under the Boost Software License, Version 1.0.
// (See accompanying file LICENSE_1_0.txt or copy at
// http://www.boost.org/LICENSE_1_0.txt)
// no_tls client and server
#include <iostream>
#include <iomanip>
#include <map>
#include <sstream>
#include <mqtt_client_cpp.hpp>
#include <boost/lexical_cast.hpp>
#include <msgpack.hpp>
struct data {
int i;
double d;
std::string s;
MSGPACK_DEFINE(i, d, s);
};
template <typename Client, typename Disconnect>
void client_proc(
Client& c,
std::uint16_t& pid_sub1,
Disconnect const& disconnect) {
using packet_id_t = typename std::remove_reference_t<decltype(*c)>::packet_id_t;
// Setup client
c->set_client_id("cid1");
c->set_clean_session(true);
// Setup handlers
c->set_connack_handler(
[&c, &pid_sub1]
(bool sp, MQTT_NS::connect_return_code connack_return_code){
std::cout << "[client] Connack handler called" << std::endl;
std::cout << "[client] Clean Session: " << std::boolalpha << sp << std::endl;
std::cout << "[client] Connack Return Code: "
<< MQTT_NS::connect_return_code_to_str(connack_return_code) << std::endl;
if (connack_return_code == MQTT_NS::connect_return_code::accepted) {
pid_sub1 = c->subscribe("mqtt_client_cpp/topic1", MQTT_NS::qos::at_most_once);
}
return true;
});
c->set_close_handler(
[]
(){
std::cout << "[client] closed." << std::endl;
});
c->set_error_handler(
[]
(boost::system::error_code const& ec){
std::cout << "[client] error: " << ec.message() << std::endl;
});
c->set_suback_handler(
[&]
(packet_id_t packet_id, std::vector<MQTT_NS::suback_return_code> results){
using namespace MQTT_NS::literals;
std::cout << "[client] suback received. packet_id: " << packet_id << std::endl;
for (auto const& e : results) {
std::cout << "[client] subscribe result: " << e << std::endl;
}
std::stringstream ss;
// pack data to msgpack
msgpack::pack(ss, data { 42, 12.3, "ABC" } );
auto const& str = ss.str();
c->publish("mqtt_client_cpp/topic1"_mb, MQTT_NS::buffer(MQTT_NS::string_view(str.data(), str.size())), MQTT_NS::qos::at_most_once);
return true;
});
c->set_publish_handler(
[&]
(bool dup,
MQTT_NS::qos qos_value,
bool retain,
MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::buffer topic_name,
MQTT_NS::buffer contents){
std::cout << "[client] publish received. "
<< "dup: " << std::boolalpha << dup
<< " qos: " << qos_value
<< " retain: " << std::boolalpha << retain << std::endl;
if (packet_id)
std::cout << "[client] packet_id: " << *packet_id << std::endl;
std::cout << "[client] topic_name: " << topic_name << std::endl;
auto oh = msgpack::unpack(contents.data(), contents.size());
std::cout << "[client] contents: " << oh.get() << std::endl;
auto recv_data = oh->as<data>();
std::cout << recv_data.i << std::endl;
std::cout << recv_data.d << std::endl;
std::cout << recv_data.s << std::endl;
disconnect();
return true;
});
// Connect
c->connect();
}
#include <boost/multi_index_container.hpp>
#include <boost/multi_index/ordered_index.hpp>
#include <boost/multi_index/member.hpp>
#include <mqtt_server_cpp.hpp>
namespace mi = boost::multi_index;
using con_t = MQTT_NS::server<>::endpoint_t;
using con_sp_t = std::shared_ptr<con_t>;
struct sub_con {
sub_con(MQTT_NS::buffer topic, con_sp_t con, MQTT_NS::qos qos_value)
:topic(std::move(topic)), con(std::move(con)), qos_value(qos_value) {}
MQTT_NS::buffer topic;
con_sp_t con;
MQTT_NS::qos qos_value;
};
struct tag_topic {};
struct tag_con {};
using mi_sub_con = mi::multi_index_container<
sub_con,
mi::indexed_by<
mi::ordered_non_unique<
mi::tag<tag_topic>,
BOOST_MULTI_INDEX_MEMBER(sub_con, MQTT_NS::buffer, topic)
>,
mi::ordered_non_unique<
mi::tag<tag_con>,
BOOST_MULTI_INDEX_MEMBER(sub_con, con_sp_t, con)
>
>
>;
inline void close_proc(std::set<con_sp_t>& cons, mi_sub_con& subs, con_sp_t const& con) {
cons.erase(con);
auto& idx = subs.get<tag_con>();
auto r = idx.equal_range(con);
idx.erase(r.first, r.second);
}
template <typename Server>
void server_proc(Server& s, std::set<con_sp_t>& connections, mi_sub_con& subs) {
s.set_error_handler(
[](boost::system::error_code const& ec) {
std::cout << "[server] error: " << ec.message() << std::endl;
}
);
s.set_accept_handler(
[&s, &connections, &subs](con_sp_t spep) {
auto& ep = *spep;
std::weak_ptr<con_t> wp(spep);
using packet_id_t = typename std::remove_reference_t<decltype(ep)>::packet_id_t;
std::cout << "[server] accept" << std::endl;
// For server close if ep is closed.
auto g = MQTT_NS::shared_scope_guard(
[&s] {
std::cout << "[server] session end" << std::endl;
s.close();
}
);
// Pass spep to keep lifetime.
// It makes sure wp.lock() never return nullptr in the handlers below
// including close_handler and error_handler.
ep.start_session(std::make_tuple(std::move(spep), std::move(g)));
// set connection (lower than MQTT) level handlers
ep.set_close_handler(
[&connections, &subs, wp]
(){
std::cout << "[server] closed." << std::endl;
auto sp = wp.lock();
BOOST_ASSERT(sp);
close_proc(connections, subs, sp);
});
ep.set_error_handler(
[&connections, &subs, wp]
(boost::system::error_code const& ec){
std::cout << "[server] error: " << ec.message() << std::endl;
auto sp = wp.lock();
BOOST_ASSERT(sp);
close_proc(connections, subs, sp);
});
// set MQTT level handlers
ep.set_connect_handler(
[&connections, wp]
(MQTT_NS::buffer client_id,
MQTT_NS::optional<MQTT_NS::buffer> username,
MQTT_NS::optional<MQTT_NS::buffer> password,
MQTT_NS::optional<MQTT_NS::will>,
bool clean_session,
std::uint16_t keep_alive) {
using namespace MQTT_NS::literals;
std::cout << "[server] client_id : " << client_id << std::endl;
std::cout << "[server] username : " << (username ? username.value() : "none"_mb) << std::endl;
std::cout << "[server] password : " << (password ? password.value() : "none"_mb) << std::endl;
std::cout << "[server] clean_session: " << std::boolalpha << clean_session << std::endl;
std::cout << "[server] keep_alive : " << keep_alive << std::endl;
auto sp = wp.lock();
BOOST_ASSERT(sp);
connections.insert(sp);
sp->connack(false, MQTT_NS::connect_return_code::accepted);
return true;
}
);
ep.set_disconnect_handler(
[&connections, &subs, wp]
(){
std::cout << "[server] disconnect received." << std::endl;
auto sp = wp.lock();
BOOST_ASSERT(sp);
close_proc(connections, subs, sp);
});
ep.set_publish_handler(
[&subs]
(bool is_dup,
MQTT_NS::qos qos_value,
bool is_retain,
MQTT_NS::optional<packet_id_t> packet_id,
MQTT_NS::buffer topic_name,
MQTT_NS::buffer contents){
std::cout << "[server] publish received."
<< " dup: " << std::boolalpha << is_dup
<< " qos: " << qos_value
<< " retain: " << std::boolalpha << is_retain << std::endl;
if (packet_id)
std::cout << "[server] packet_id: " << *packet_id << std::endl;
std::cout << "[server] topic_name: " << topic_name << std::endl;
auto oh = msgpack::unpack(contents.data(), contents.size());
std::cout << "[server] contents: " << oh.get() << std::endl;
auto recv_data = oh->as<data>();
std::cout << recv_data.i << std::endl;
std::cout << recv_data.d << std::endl;
std::cout << recv_data.s << std::endl;
auto const& idx = subs.get<tag_topic>();
auto r = idx.equal_range(topic_name);
for (; r.first != r.second; ++r.first) {
r.first->con->publish(
boost::asio::buffer(topic_name),
boost::asio::buffer(contents),
std::make_pair(topic_name, contents),
std::min(r.first->qos_value, qos_value),
is_retain
);
}
return true;
});
ep.set_subscribe_handler(
[&subs, wp]
(packet_id_t packet_id,
std::vector<std::tuple<MQTT_NS::buffer, MQTT_NS::subscribe_options>> entries) {
std::cout << "[server]subscribe received. packet_id: " << packet_id << std::endl;
std::vector<MQTT_NS::suback_return_code> res;
res.reserve(entries.size());
auto sp = wp.lock();
BOOST_ASSERT(sp);
for (auto const& e : entries) {
MQTT_NS::buffer topic = std::get<0>(e);
MQTT_NS::qos qos_value = std::get<1>(e).get_qos();
std::cout << "[server] topic: " << topic << " qos: " << qos_value << std::endl;
res.emplace_back(MQTT_NS::qos_to_suback_return_code(qos_value));
subs.emplace(std::move(topic), sp, qos_value);
}
sp->suback(packet_id, res);
return true;
}
);
ep.set_unsubscribe_handler(
[&subs, wp]
(packet_id_t packet_id,
std::vector<MQTT_NS::buffer> topics) {
std::cout << "[server]unsubscribe received. packet_id: " << packet_id << std::endl;
for (auto const& topic : topics) {
subs.erase(topic);
}
auto sp = wp.lock();
BOOST_ASSERT(sp);
sp->unsuback(packet_id);
return true;
}
);
}
);
s.listen();
}
int main(int argc, char** argv) {
if (argc != 2) {
std::cout << argv[0] << " port" << std::endl;
return -1;
}
boost::asio::io_context ioc;
std::uint16_t port = boost::lexical_cast<std::uint16_t>(argv[1]);
// server
auto s = MQTT_NS::server<>(
boost::asio::ip::tcp::endpoint(
boost::asio::ip::tcp::v4(),
boost::lexical_cast<std::uint16_t>(argv[1])
),
ioc
);
std::set<con_sp_t> connections;
mi_sub_con subs;
server_proc(s, connections, subs);
// client
std::uint16_t pid_sub1;
auto c = MQTT_NS::make_sync_client(ioc, "localhost", port);
int count = 0;
auto disconnect = [&] {
if (++count == 1) c->disconnect();
};
client_proc(c, pid_sub1, disconnect);
ioc.run();
}