libzmq
libzmq copied to clipboard
why I should move the code zmq::message_t zmq_message into the while scope every sending message?
I try to use zmq to transport capnproto serialized_message. but I get the drump crash.
#include "capnproto/message.capnp.h"
#include <capnp/message.h>
#include <capnp/serialize.h>
#include <kj/std/iostream.h>
#include <zmq.hpp>
int main() {
// create a message builder
capnp::MallocMessageBuilder message;
Message::Builder messageBuilder = message.initRoot<Message>();
// set the preparation and heading fields
messageBuilder.setPreparation("prep");
messageBuilder.setHeading("heading");
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_PUB);
socket.bind("tcp://127.0.0.1:5555");
// serialize the message to a memory buffer
kj::Array<capnp::word> serialized_message =capnp::messageToFlatArray(message);
auto byteArray = serialized_message.asBytes();
// create a ZeroMQ message from the serialized buffer
zmq::message_t zmq_message(byteArray);
memcpy(zmq_message.data(), byteArray.begin(), byteArray.size());
while (true) {
socket.send(zmq_message);
}
return 0;
}
#include "capnproto/message.capnp.h"
#include <capnp/message.h>
#include <capnp/serialize.h>
#include <kj/std/iostream.h>
#include <zmq.hpp>
int main()
{
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_SUB);
socket.connect("tcp://127.0.0.1:5555"); // or *
socket.set(zmq::sockopt::subscribe, "");
while (true) {
zmq::message_t zmq_message;
socket.recv(zmq_message);
//if (reinterpret_cast<uintptr_t>(zmq_message.size()) % sizeof(capnp::word) == 0) {
// message is aligned
// create a memory buffer from the received message
// kj::ArrayPtr<capnp::word> buffer(reinterpret_cast<capnp::word*>(zmq_message.data()),
// zmq_message.size() / sizeof(capnp::word));
auto buffer = kj::heapArray<capnp::word>(zmq_message.size() / sizeof(capnp::word));
memcpy(buffer.asBytes().begin(), zmq_message.data(), buffer.asBytes().size());
// create an input stream from the memory buffer
capnp::FlatArrayMessageReader message_reader(buffer);
Message::Reader message = message_reader.getRoot<Message>();
// print the preparation and heading fields
std::cout << "Preparation: " << message.getPreparation().cStr() << std::endl;
std::cout << "Heading: " << message.getHeading().cStr() << std::endl;
// }
// else {
// // message is not aligned
// std::cerr << "Agent: receive(): Not aligned " << std::endl;
//}
}
return 0;
}
terminate called after throwing an instance of 'kj::ExceptionImpl'
what(): capnp/message.c++:99: failed: expected segment != nullptr && segment->checkObject(segment->getStartPtr(), ONE * WORDS); Message did not contain a root pointer.
stack: 55efdad18f3a 55efdacf095f 55efdacefdac 7f6b183de082 55efdacefb2d
??:0: returning here
??:0: returning here
??:0: returning here
??:0: returning here
??:0: returning here
my code is copy from othe frst question and I try to add the anwer from second answer. I run the code from second question. it works.
https://stackoverflow.com/questions/74975616/zeromq-pub-sub-socket-receiving-error-of-a-serialized-message-object-using-capn
https://stackoverflow.com/questions/75006774/capn-proto-unaligned-data-error-while-trying-to-sendreceive-serialized-object
It has some problem in publisher code. forget to add the .size() should move the code zmq::message_t zmq_message into the while scope.
#include "capnproto/message.capnp.h"
#include <capnp/message.h>
#include <capnp/serialize.h>
#include <kj/std/iostream.h>
#include <zmq.hpp>
int main() {
// create a message builder
capnp::MallocMessageBuilder message;
Message::Builder messageBuilder = message.initRoot<Message>();
// set the preparation and heading fields
messageBuilder.setPreparation("prep");
messageBuilder.setHeading("heading");
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_PUB);
socket.bind("tcp://127.0.0.1:5555");
// serialize the message to a memory buffer
kj::Array<capnp::word> serialized_message =capnp::messageToFlatArray(message);
auto byteArray = serialized_message.asBytes();
while (true) {
// create a ZeroMQ message from the serialized buffer
// zmq::message_t zmq_message(serialized_message.size() * sizeof(capnp::word));
// memcpy((void*)zmq_message.data(), serialized_message.begin(), serialized_message.size() * sizeof(capnp::word));
zmq::message_t zmq_message(byteArray.size());
memcpy(zmq_message.data(), byteArray.begin(), byteArray.size());
socket.send(zmq_message);
}
return 0;
}
It has some problem in publisher code. forget to add the .size() should move the code zmq::message_t zmq_message into the while scope.
#include "capnproto/message.capnp.h"
#include <capnp/message.h>
#include <capnp/serialize.h>
#include <kj/std/iostream.h>
#include <zmq.hpp>
int main() {
// create a message builder
capnp::MallocMessageBuilder message;
Message::Builder messageBuilder = message.initRoot<Message>();
// set the preparation and heading fields
messageBuilder.setPreparation("prep");
messageBuilder.setHeading("heading");
zmq::context_t context(1);
zmq::socket_t socket(context, ZMQ_PUB);
socket.bind("tcp://127.0.0.1:5555");
// serialize the message to a memory buffer
kj::Array<capnp::word> serialized_message =capnp::messageToFlatArray(message);
auto byteArray = serialized_message.asBytes();
while (true) {
// create a ZeroMQ message from the serialized buffer
// zmq::message_t zmq_message(serialized_message.size() * sizeof(capnp::word));
// memcpy((void*)zmq_message.data(), serialized_message.begin(), serialized_message.size() * sizeof(capnp::word));
zmq::message_t zmq_message(byteArray.size());
memcpy(zmq_message.data(), byteArray.begin(), byteArray.size());
socket.send(zmq_message);
}
return 0;
}
why I should move the code zmq::message_t zmq_message into the while scope?