zmqpp
zmqpp copied to clipboard
Sample tutorials for more functionality.
It will be nice to have some more tutorial and examples for functionality like Reactors and Pollers.
Tutorials on actors and inter-thread communications would be very nice too.
Would having versions of the examples from the zmq guide (https://github.com/imatix/zguide/tree/master/examples) build with zmqpp solve that? I'm not sure I'm all that good at writing a tutorial.
That would be very helpful. On May 19, 2016 11:40 AM, "Ben Gray" [email protected] wrote:
Would having versions of the examples from the zmq guide ( https://github.com/imatix/zguide/tree/master/examples) build with zmqpp solve that? I'm not sure I'm all that good at writing a tutorial.
— You are receiving this because you authored the thread. Reply to this email directly or view it on GitHub https://github.com/zeromq/zmqpp/issues/160#issuecomment-220262569
+1
#include <iostream>
#include <string>
#include <chrono>
#include <thread>
#include <vector>
using namespace std;
#include <zmqpp/context.hpp>
#include <zmqpp/socket.hpp>
#include <zmqpp/message.hpp>
#include <zmqpp/reactor.hpp>
#include <signal.h>
static volatile bool interrupted = false;
void my_signal_handler(int){
cout << "signal" << endl;
interrupted = true;
}
void producer_task(){
try{
zmqpp::context zmq_context;
zmqpp::socket socket {
zmq_context,
zmqpp::socket_type::push
};
socket.bind( "tcp://*:4099" );
int x = 0;
while( 1 ){
std::this_thread::sleep_for( std::chrono::seconds(1) );
cout << "Message: " << x << endl;
zmqpp::message request;
request << "Message: " + std::to_string(x);
request << x;
socket.send(request);
x++;
}
}catch( zmqpp::zmq_internal_exception& e ){
cerr << "Exception: " << e.what() << endl;
}catch( ... ){
cerr << "Unknown Exception: " << endl;
}
}
void consumer_task(){
try{
zmqpp::context zmq_context;
zmqpp::socket socket_1 {
zmq_context,
zmqpp::socket_type::pull
};
socket_1.connect( "tcp://127.0.0.1:4099" );
zmqpp::socket socket_2 {
zmq_context,
zmqpp::socket_type::pull
};
socket_2.connect( "tcp://127.0.0.1:4099" );
zmqpp::reactor reactor;
auto first_listener = [&socket_1](){
zmqpp::message response;
socket_1.receive(response);
cout << "first_listener: " << response.get(0) << endl;
};
auto second_listener = [&socket_2](){
zmqpp::message response;
socket_2.receive(response);
cout << "second_listener: " << response.get(0) << endl;
};
reactor.add( socket_1, first_listener );
reactor.add( socket_2, second_listener );
while( reactor.poll() && !interrupted ){
}
//interrupted
cout << "interrupted" << endl;
}catch( zmqpp::zmq_internal_exception& e ){
cerr << "Exception: " << e.what() << endl;
}catch( ... ){
cerr << "Unknown Exception: " << endl;
}
}
void usage(){
cout << "usage: test [producer|consumer]" << endl;
}
int main( int argc, char** argv ){
signal(SIGINT, my_signal_handler);
string task;
vector<string> arguments;
if( argc > 1 ){
int count = 0;
while( count < argc ){
if( count == 1 ) task = string(argv[1]);
arguments.push_back( argv[count] );
count++;
}
}else{
usage();
return 0;
}
if( task == "producer" ){
std::thread producer_thread( producer_task );
producer_thread.join();
}else if( task == "consumer" ){
std::thread consumer_thread( consumer_task );
consumer_thread.join();
}else{
usage();
return 0;
}
return 0;
}
Alternative using the reactor's poller and one callback:
zmqpp::context zmq_context;
zmqpp::socket socket_1 {
zmq_context,
zmqpp::socket_type::pull
};
socket_1.connect( "tcp://127.0.0.1:4099" );
//socket_1.subscribe("");
zmqpp::socket socket_2 {
zmq_context,
zmqpp::socket_type::pull
};
socket_2.connect( "tcp://127.0.0.1:4099" );
//socket_2.subscribe("");
zmqpp::socket pub_socket {
zmq_context,
zmqpp::socket_type::pub
};
pub_socket.bind( "tcp://127.0.0.1:4098" );
zmqpp::reactor reactor;
zmqpp::poller& poller = reactor.get_poller();
auto socket_listener = [&poller, &socket_1, &socket_2, &pub_socket](){
zmqpp::message response;
if( poller.has_input(socket_1) ){
socket_1.receive(response);
cout << "1: " << response.get(0) << endl;
pub_socket.send("OKAY 1");
}
if( poller.has_input(socket_2) ){
socket_2.receive(response);
cout << "2: " << response.get(0) << endl;
pub_socket.send("OKAY 2");
}
};
reactor.add( socket_1, socket_listener );
reactor.add( socket_2, socket_listener );
while( reactor.poll() != -1 && !interrupted ){
}
+1