NuRaft icon indicating copy to clipboard operation
NuRaft copied to clipboard

Why would asio::buffer(my_buffer) change the input my_buffer so dramatically?

Open iTomGeller opened this issue 1 year ago • 0 comments

Hi, I am playing with the buffer and asio_service in cornerstone and trying to rewrite them when I encounter a problem shown below.(the cornerstone repo has been inactive for at least a year,so I open an issue here :) image

As you can see in terminal, the buffer size is all right until it has been passed to the asio::buffer, where its size has been dramatically changed and no int_data can be fetched.

the rpc_client is as follows:

image

the asio_service.h is shown below


#include <asio.hpp>
#include <iostream>
#include <memory>
#include "buffer.h"
//#include "buffer.hxx"
#include "rpc_cli.h"
#include "rpc_listener.h"
using namespace raft;
// request header, ulong term (8), msg_type type (1), int32 src (4), int32 dst (4), ulong last_log_term (8), ulong
// last_log_idx (8), ulong commit_idx (8) + one int32 (4) for log data size
#define RPC_REQ_HEADER_SIZE 3 * 4 + 8 * 4 + 1

// response header ulong term (8), msg_type type (1), int32 src (4), int32 dst (4), ulong next_idx (8), bool accepted
// (1)
#define RPC_RESP_HEADER_SIZE 4 * 2 + 8 * 2 + 2

class asio_rpc_client : public rpc_client, public std::enable_shared_from_this<asio_rpc_client>
{
private:
    asio::io_service io_svc;

public:
    asio_rpc_client(std::string &host, std::string &port, asio::io_service &io_svc)
        : host_(host), port_(port), socket_(io_svc), resolver_(io_svc)
    {
    }
    virtual ~asio_rpc_client()
    {
        if (socket_.is_open())
        {
            socket_.close();
        }
    }

private:
    void connect(bufptr &buf)
    {
        std::shared_ptr<asio_rpc_client> self(this->shared_from_this());
        asio::ip::tcp::resolver::query q(host_, port_, asio::ip::tcp::resolver::all_matching);
        // async_connect返回下一个可以连接的itor
        resolver_.async_resolve(q,
                                [self, this, &buf](std::error_code err, asio::ip::tcp::resolver::iterator itor) mutable
                                {
                                    asio::async_connect(socket_, itor,
                                                        [self, &buf](std::error_code err, asio::ip::tcp::resolver::iterator itor) mutable
                                                        {
                                                            if (!err)
                                                            {
                                                                std::cout << "Connected Successfully\n";
                                                                self->send(buf);
                                                            }
                                                            else
                                                            {
                                                                std::cout << "Connected failed\n";
                                                            }
                                                        });
                                });
    }
    // override关键词强制重写
public:
    virtual void send(bufptr &buf) override
    {
        // std::cout << socket_.is_open() << std::endl;
        if (!socket_.is_open())
        {
            this->connect(buf);
            // std::cout << "socket is closed!";
        }
        else
        {
            std::shared_ptr<asio_rpc_client> self(this->shared_from_this());
            asio::async_write(socket_, asio::buffer(buf->data(), buf->size()),
                              [this, self](std::error_code err, ssize_t bytes_transferred)
                              {
                                  std::cout << "send msg successfully!\n";
                              });
        }
    }

private:
    // 把走io_svc的声明成private,传参只需传io_svc
    std::string host_;
    std::string port_;
    asio::ip::tcp::socket socket_;
    asio::ip::tcp::resolver resolver_;
};
class asio_rpc_session : public std::enable_shared_from_this<asio_rpc_session>
{
public:
    asio_rpc_session(asio::io_service &io) : socket_(io) {

    }
    ~asio_rpc_session() {
        stop();
    }
public:
    asio::ip::tcp::socket& socket() {
        return socket_;
    }
    void start() {
        this->read();
    }
    void stop() {
        if (socket_.is_open())
            socket_.close();
    }
private:
    void read() {
        //自动调用move constructor
        log_data = buffer::alloc(4);
        std::cout << "the inital log_data size is :" << log_data->size() << std::endl;
        std::shared_ptr<asio_rpc_session> self(this->shared_from_this());

        //如果不将log_data设置为全局变量就会出现size乱变的情况

        //这里输出得log_data->size() 是bytes_transferred 不是真实的发过来的
        asio::async_read(
            socket_,asio::buffer(log_data->data(), 4),
            [this, self](std::error_code err, ssize_t bytes_transferred){
                if (!err) {
                    //log_data->pos(bytes_transferred);
                    std::cout << "bytes read is :" << bytes_transferred << std::endl;
                std::cout << "the int data is :" << log_data->get_int() << std::endl;
                std::cout << "the size of buffer is :" << log_data->size() << std::endl;
                }
                else {
                    std::cout << "No data read\n";
                }
            }
        );

    }

private:
asio::ip::tcp::socket socket_;
bufptr log_data;
};
class asio_rpc_listener : public rpc_listener, public std::enable_shared_from_this<asio_rpc_listener>
{
//port 是 ushort类型
public:
    asio_rpc_listener(asio::io_service& io, ushort port) : 
     io_svc(io),
     acceptor_(io, asio::ip::tcp::endpoint(asio::ip::tcp::v4(), port)) 
      {}
    ~asio_rpc_listener()
    {
        stop();
    }

public:
    virtual void listen()
    {
        if (!acceptor_.is_open())
            return;
        this->start();
    }
    virtual void stop()
    {
        if (acceptor_.is_open())
            acceptor_.close();
    }

private:
    void start()
    {
        std::shared_ptr<asio_rpc_listener> self(this->shared_from_this());
        std::shared_ptr<asio_rpc_session> session(std::make_shared<asio_rpc_session>(io_svc));
        acceptor_.async_accept(
            session->socket(),
            [self, session](std::error_code err)
            {
                if (!err)
                {
                    session->start();
                }
                else
                {
                    std::cout << "No clients yet.....\n";
                }
                self->listen();
            });
    }

private:
    //这里的io_svc得声明成引用
    asio::io_service &io_svc;
    asio::ip::tcp::acceptor acceptor_;
    std::mutex session_lock;
};

and this is my buffer.cpp


#include "buffer.h"

typedef unsigned int uint32;
#define __init_block(p, s, t) \
    ((t *)(p))[0] = (t)s;     \
    ((t *)(p))[1] = 0;

#define __init_small_block(p, s) __init_block(p, s, ushort);
#define __init_big_block(p, s)  \
    __init_block(p, s, uint32); \
    ((uint32 *)(p))[0] |= 0x80000000;

#define __is_big_block(p) (((uint32 *)(p))[0] & 0x80000000)

#define __pos_of_big_block(p) ((uint32 *)(p))[1]
#define __pos_of_small_block(p) ((ushort *)(p))[1]

#define __pos_of_block(p) (__is_big_block(p)) ? __pos_of_big_block(p) : __pos_of_small_block(p)
#define __size_of_block(p) (__is_big_block(p)) ? ((uint32 *)(p))[0] ^ 0x80000000 : ((uint32 *)(p))[0]

#define __mv_block_pos(p, len) \
    __is_big_block(p) ? ((uint32 *)(p))[1] += len : ((ushort *)(p))[1] += (ushort)len;

#define __set_block_pos(p, len) \
    __is_big_block(p) ? ((uint32 *)(p))[1] = len : ((ushort *)(p))[1] = (ushort)len;
// 这里直接用pos_of_big/small_block 不然会出现运算顺序问题
#define __data_of_block(p) __is_big_block(p) ? (byte *)(((byte *)((uint32 *)(p) + 2)) + __pos_of_big_block(p)) : (byte *)(((byte *)((ushort *)(p) + 2)) + __pos_of_small_block(p))
using namespace raft;
bufptr buffer::alloc(ssize_t size)
{
    if (size >= 0x80000000)
    {
        throw std::out_of_range("size exceed the max size that raft::buffer could support");
    }

    if (size >= 0x8000)
    {
        bufptr buf(reinterpret_cast<buffer *>(new char[size + sizeof(uint32) * 2]));
        void *ptr = reinterpret_cast<void *>(buf.get());
        __init_big_block(ptr, size);
        return buf;
    }

    bufptr buf(reinterpret_cast<buffer *>(new char[size + sizeof(ushort) * 2]));
    void *ptr = reinterpret_cast<void *>(buf.get());
    __init_small_block(ptr, size);
    return buf;
}

ssize_t buffer::pos() const
{
    return ssize_t(__pos_of_block(this));
}

byte *buffer::data() const
{
    return __data_of_block(this);
}

ssize_t buffer::size() const
{
    return (ssize_t)(__size_of_block(this));
}

void buffer::pos(ssize_t p)
{
    __set_block_pos(this, p);
}

// 逆序存数据
void buffer::put(int val)
{
    ssize_t pos_ = pos();
    ssize_t sz = size();
    byte *d = data();
    if (pos_ + sz_int > sz)
    {
        throw std::overflow_error("insufficient buffer to store int");
    }

    for (ssize_t i = 0; i < sz_int; i++)
    {
        // 转成byte取最低8位
        *(d + i) = (byte)(val >> (i * 8));
    }
    __mv_block_pos(this, sz_int);
}

void buffer::put(ulong val)
{
    ssize_t pos_ = pos();
    ssize_t sz = size();
    byte *d = data();
    if (pos_ + sz_ulong > sz)
    {
        throw std::overflow_error("insufficient buffer to store ulong");
    }

    for (int i = 0; i < sz_ulong; i++)
    {
        // 转成byte取最低8位
        *(d + i) = (byte)(val >> (i * 8));
    }
    __mv_block_pos(this, sz_ulong);
}

void buffer::put(std::string str)
{
    ssize_t pos_ = pos();
    ssize_t sz = size();
    byte *d = data();
    ssize_t len = str.size();

    if (pos_ + len > sz)
    {
        throw std::overflow_error("insufficient buffer to store str");
    }

    for (int i = 0; i < len; i++)
    {
        // 转成byte取最低8位
        *(d + i) = str[i];
    }

    __mv_block_pos(this, len);
}

void buffer::put(const buffer &buf)
{
}

int buffer::get_int()
{
    ssize_t pos_ = pos();

    if (pos_ + sz_int > size())
    {
        throw std::overflow_error("insufficient buffer available for a int");
    }

    byte *d = data();
    int val = 0;
    for (ssize_t i = 0; i < sz_int; i++)
    {
        int byteval = (*(d + i)) << (i * 8);
        val += byteval;
    }

    __mv_block_pos(this, sz_int);
    return val;
}

ulong buffer::get_ulong()
{
    ssize_t pos_ = pos();

    if (pos_ + sz_ulong > size())
    {
        throw std::overflow_error("insufficient buffer available for a int");
    }

    byte *d = data();
    ulong val = 0;
    for (ssize_t i = 0; i < sz_ulong; i++)
    {
        ulong byteval = ulong((*(d + i))) << (i * 8); // 需要加ull,否则默认按int来左移
        val += byteval;
    }

    __mv_block_pos(this, sz_ulong);
    return val;
}

std::string buffer::get_str(ssize_t len)
{
    ssize_t pos_ = pos();

    if (pos_ + len > size())
    {
        throw std::overflow_error("insufficient buffer available for the string");
    }

    byte *d = data();
    std::string str;
    for (int i = 0; i < len; i++)
    {
        str += *(d + i);
    }

    __mv_block_pos(this, len);
    return str;
}

byte buffer::get_byte()
{
    ssize_t pos_ = pos();
    if (pos_ > size())
    {
        throw std::overflow_error("insufficient buffer available for a byte");
    }

    __mv_block_pos(this, 1);
    return *data();
}

iTomGeller avatar Oct 20 '24 04:10 iTomGeller