NuRaft
NuRaft copied to clipboard
Why would asio::buffer(my_buffer) change the input my_buffer so dramatically?
Hi,
I am playing with the buffer and asio_service in cornerstone and trying to rewrite them when I encounter a problem shown below.(the cornerstone repo has been inactive for at least a year,so I open an issue here :)
As you can see in terminal, the buffer size is all right until it has been passed to the asio::buffer, where its size has been dramatically changed and no int_data can be fetched.
the rpc_client is as follows:
the asio_service.h is shown below
#include <asio.hpp>
#include <iostream>
#include <memory>
#include "buffer.h"
//#include "buffer.hxx"
#include "rpc_cli.h"
#include "rpc_listener.h"
using namespace raft;
// request header, ulong term (8), msg_type type (1), int32 src (4), int32 dst (4), ulong last_log_term (8), ulong
// last_log_idx (8), ulong commit_idx (8) + one int32 (4) for log data size
#define RPC_REQ_HEADER_SIZE 3 * 4 + 8 * 4 + 1
// response header ulong term (8), msg_type type (1), int32 src (4), int32 dst (4), ulong next_idx (8), bool accepted
// (1)
#define RPC_RESP_HEADER_SIZE 4 * 2 + 8 * 2 + 2
class asio_rpc_client : public rpc_client, public std::enable_shared_from_this<asio_rpc_client>
{
private:
asio::io_service io_svc;
public:
asio_rpc_client(std::string &host, std::string &port, asio::io_service &io_svc)
: host_(host), port_(port), socket_(io_svc), resolver_(io_svc)
{
}
virtual ~asio_rpc_client()
{
if (socket_.is_open())
{
socket_.close();
}
}
private:
void connect(bufptr &buf)
{
std::shared_ptr<asio_rpc_client> self(this->shared_from_this());
asio::ip::tcp::resolver::query q(host_, port_, asio::ip::tcp::resolver::all_matching);
// async_connect返回下一个可以连接的itor
resolver_.async_resolve(q,
[self, this, &buf](std::error_code err, asio::ip::tcp::resolver::iterator itor) mutable
{
asio::async_connect(socket_, itor,
[self, &buf](std::error_code err, asio::ip::tcp::resolver::iterator itor) mutable
{
if (!err)
{
std::cout << "Connected Successfully\n";
self->send(buf);
}
else
{
std::cout << "Connected failed\n";
}
});
});
}
// override关键词强制重写
public:
virtual void send(bufptr &buf) override
{
// std::cout << socket_.is_open() << std::endl;
if (!socket_.is_open())
{
this->connect(buf);
// std::cout << "socket is closed!";
}
else
{
std::shared_ptr<asio_rpc_client> self(this->shared_from_this());
asio::async_write(socket_, asio::buffer(buf->data(), buf->size()),
[this, self](std::error_code err, ssize_t bytes_transferred)
{
std::cout << "send msg successfully!\n";
});
}
}
private:
// 把走io_svc的声明成private,传参只需传io_svc
std::string host_;
std::string port_;
asio::ip::tcp::socket socket_;
asio::ip::tcp::resolver resolver_;
};
class asio_rpc_session : public std::enable_shared_from_this<asio_rpc_session>
{
public:
asio_rpc_session(asio::io_service &io) : socket_(io) {
}
~asio_rpc_session() {
stop();
}
public:
asio::ip::tcp::socket& socket() {
return socket_;
}
void start() {
this->read();
}
void stop() {
if (socket_.is_open())
socket_.close();
}
private:
void read() {
//自动调用move constructor
log_data = buffer::alloc(4);
std::cout << "the inital log_data size is :" << log_data->size() << std::endl;
std::shared_ptr<asio_rpc_session> self(this->shared_from_this());
//如果不将log_data设置为全局变量就会出现size乱变的情况
//这里输出得log_data->size() 是bytes_transferred 不是真实的发过来的
asio::async_read(
socket_,asio::buffer(log_data->data(), 4),
[this, self](std::error_code err, ssize_t bytes_transferred){
if (!err) {
//log_data->pos(bytes_transferred);
std::cout << "bytes read is :" << bytes_transferred << std::endl;
std::cout << "the int data is :" << log_data->get_int() << std::endl;
std::cout << "the size of buffer is :" << log_data->size() << std::endl;
}
else {
std::cout << "No data read\n";
}
}
);
}
private:
asio::ip::tcp::socket socket_;
bufptr log_data;
};
class asio_rpc_listener : public rpc_listener, public std::enable_shared_from_this<asio_rpc_listener>
{
//port 是 ushort类型
public:
asio_rpc_listener(asio::io_service& io, ushort port) :
io_svc(io),
acceptor_(io, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port))
{}
~asio_rpc_listener()
{
stop();
}
public:
virtual void listen()
{
if (!acceptor_.is_open())
return;
this->start();
}
virtual void stop()
{
if (acceptor_.is_open())
acceptor_.close();
}
private:
void start()
{
std::shared_ptr<asio_rpc_listener> self(this->shared_from_this());
std::shared_ptr<asio_rpc_session> session(std::make_shared<asio_rpc_session>(io_svc));
acceptor_.async_accept(
session->socket(),
[self, session](std::error_code err)
{
if (!err)
{
session->start();
}
else
{
std::cout << "No clients yet.....\n";
}
self->listen();
});
}
private:
//这里的io_svc得声明成引用
asio::io_service &io_svc;
asio::ip::tcp::acceptor acceptor_;
std::mutex session_lock;
};
and this is my buffer.cpp
#include "buffer.h"
typedef unsigned int uint32;
#define __init_block(p, s, t) \
((t *)(p))[0] = (t)s; \
((t *)(p))[1] = 0;
#define __init_small_block(p, s) __init_block(p, s, ushort);
#define __init_big_block(p, s) \
__init_block(p, s, uint32); \
((uint32 *)(p))[0] |= 0x80000000;
#define __is_big_block(p) (((uint32 *)(p))[0] & 0x80000000)
#define __pos_of_big_block(p) ((uint32 *)(p))[1]
#define __pos_of_small_block(p) ((ushort *)(p))[1]
#define __pos_of_block(p) (__is_big_block(p)) ? __pos_of_big_block(p) : __pos_of_small_block(p)
#define __size_of_block(p) (__is_big_block(p)) ? ((uint32 *)(p))[0] ^ 0x80000000 : ((uint32 *)(p))[0]
#define __mv_block_pos(p, len) \
__is_big_block(p) ? ((uint32 *)(p))[1] += len : ((ushort *)(p))[1] += (ushort)len;
#define __set_block_pos(p, len) \
__is_big_block(p) ? ((uint32 *)(p))[1] = len : ((ushort *)(p))[1] = (ushort)len;
// 这里直接用pos_of_big/small_block 不然会出现运算顺序问题
#define __data_of_block(p) __is_big_block(p) ? (byte *)(((byte *)((uint32 *)(p) + 2)) + __pos_of_big_block(p)) : (byte *)(((byte *)((ushort *)(p) + 2)) + __pos_of_small_block(p))
using namespace raft;
bufptr buffer::alloc(ssize_t size)
{
if (size >= 0x80000000)
{
throw std::out_of_range("size exceed the max size that raft::buffer could support");
}
if (size >= 0x8000)
{
bufptr buf(reinterpret_cast<buffer *>(new char[size + sizeof(uint32) * 2]));
void *ptr = reinterpret_cast<void *>(buf.get());
__init_big_block(ptr, size);
return buf;
}
bufptr buf(reinterpret_cast<buffer *>(new char[size + sizeof(ushort) * 2]));
void *ptr = reinterpret_cast<void *>(buf.get());
__init_small_block(ptr, size);
return buf;
}
ssize_t buffer::pos() const
{
return ssize_t(__pos_of_block(this));
}
byte *buffer::data() const
{
return __data_of_block(this);
}
ssize_t buffer::size() const
{
return (ssize_t)(__size_of_block(this));
}
void buffer::pos(ssize_t p)
{
__set_block_pos(this, p);
}
// 逆序存数据
void buffer::put(int val)
{
ssize_t pos_ = pos();
ssize_t sz = size();
byte *d = data();
if (pos_ + sz_int > sz)
{
throw std::overflow_error("insufficient buffer to store int");
}
for (ssize_t i = 0; i < sz_int; i++)
{
// 转成byte取最低8位
*(d + i) = (byte)(val >> (i * 8));
}
__mv_block_pos(this, sz_int);
}
void buffer::put(ulong val)
{
ssize_t pos_ = pos();
ssize_t sz = size();
byte *d = data();
if (pos_ + sz_ulong > sz)
{
throw std::overflow_error("insufficient buffer to store ulong");
}
for (int i = 0; i < sz_ulong; i++)
{
// 转成byte取最低8位
*(d + i) = (byte)(val >> (i * 8));
}
__mv_block_pos(this, sz_ulong);
}
void buffer::put(std::string str)
{
ssize_t pos_ = pos();
ssize_t sz = size();
byte *d = data();
ssize_t len = str.size();
if (pos_ + len > sz)
{
throw std::overflow_error("insufficient buffer to store str");
}
for (int i = 0; i < len; i++)
{
// 转成byte取最低8位
*(d + i) = str[i];
}
__mv_block_pos(this, len);
}
void buffer::put(const buffer &buf)
{
}
int buffer::get_int()
{
ssize_t pos_ = pos();
if (pos_ + sz_int > size())
{
throw std::overflow_error("insufficient buffer available for a int");
}
byte *d = data();
int val = 0;
for (ssize_t i = 0; i < sz_int; i++)
{
int byteval = (*(d + i)) << (i * 8);
val += byteval;
}
__mv_block_pos(this, sz_int);
return val;
}
ulong buffer::get_ulong()
{
ssize_t pos_ = pos();
if (pos_ + sz_ulong > size())
{
throw std::overflow_error("insufficient buffer available for a int");
}
byte *d = data();
ulong val = 0;
for (ssize_t i = 0; i < sz_ulong; i++)
{
ulong byteval = ulong((*(d + i))) << (i * 8); // 需要加ull,否则默认按int来左移
val += byteval;
}
__mv_block_pos(this, sz_ulong);
return val;
}
std::string buffer::get_str(ssize_t len)
{
ssize_t pos_ = pos();
if (pos_ + len > size())
{
throw std::overflow_error("insufficient buffer available for the string");
}
byte *d = data();
std::string str;
for (int i = 0; i < len; i++)
{
str += *(d + i);
}
__mv_block_pos(this, len);
return str;
}
byte buffer::get_byte()
{
ssize_t pos_ = pos();
if (pos_ > size())
{
throw std::overflow_error("insufficient buffer available for a byte");
}
__mv_block_pos(this, 1);
return *data();
}