less_slow.cpp icon indicating copy to clipboard operation
less_slow.cpp copied to clipboard

Fix ASIO examples

Open ashvardanian opened this issue 9 months ago • 1 comments

I'm not a massive fan of ASIO, Boost.ASIO and the NetworkingTS that builds on top of them. I'm also not a great user. My current implementation on the asio-uring-web-server branch doesn't work. I'd love to see someone suggesting a better way to use it 🤗

The current draft looks like this:

#include <asio.hpp>

class rpc_asio_server {

    asio::io_context &context_;
    asio::ip::udp::socket socket_;

    /// @brief Buffers, one per concurrent request
    std::vector<rpc_buffer_t> buffers_;
    /// @brief Where did the packets come from
    std::vector<asio::ip::udp::endpoint> clients_;
    /// @brief Flag to stop the server without corrupting the state
    std::atomic_bool should_stop_;
    /// @brief Maximum time for this entire batch
    std::chrono::microseconds max_cycle_duration_;

    std::size_t failed_receptions_ = 0;
    std::size_t failed_responses_ = 0;

  public:
    rpc_asio_server(                                                           //
        asio::io_context &ctx, std::string const &address, std::uint16_t port, //
        std::size_t max_concurrency, std::chrono::microseconds max_cycle_duration)
        : context_(ctx), socket_(context_, asio::ip::udp::endpoint(asio::ip::make_address(address), port)),
          buffers_(max_concurrency), clients_(max_concurrency), max_cycle_duration_(max_cycle_duration) {}

    void stop() { should_stop_.store(true, std::memory_order_relaxed); }

    void operator()() {
        while (!should_stop_.load(std::memory_order_relaxed)) one_batch();
    }

    void one_batch() {
        // For per-operation cancellations we could use the `asio::cancellation_signal`,
        // but this is the simple lucky case when we only want to cancel all the outstanding
        // transfers at once.
        std::atomic<std::size_t> remaining = 0;
        for (std::size_t job = 0; job < buffers_.size(); ++job, ++remaining) {
            auto finalize = [this, &remaining](std::error_code error, std::size_t) {
                remaining--;
                if (error) failed_responses_++;
            };
            auto respond = [this, job, finalize, &remaining](std::error_code error, std::size_t bytes) {
                if (error) { remaining--; }
                else { socket_.async_send_to(asio::buffer(buffers_[job], bytes), clients_[job], finalize); }
            };
            socket_.async_receive_from(asio::buffer(buffers_[job]), clients_[job], respond);
        }
        std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now() + max_cycle_duration_;
        asio::steady_timer timer(context_, expiry);
        timer.wait();
        if (remaining) socket_.cancel(); // Forcibly abort all ops on this socket
    }
};

class rpc_asio_client {

    asio::io_context &context_;
    asio::ip::udp::socket socket_;
    asio::ip::udp::endpoint server_;

    /// @brief Buffers, one per concurrent request
    std::vector<rpc_buffer_t> buffers_;
    /// @brief Track the send timestamps for each slot to measure latency
    std::vector<std::chrono::steady_clock::time_point> send_times_;
    /// @brief Maximum time for this entire batch
    std::chrono::microseconds max_cycle_duration_;

  public:
    rpc_asio_client(                                                               //
        asio::io_context &ctx, std::string const &server_addr, std::uint16_t port, //
        std::size_t concurrency, std::chrono::microseconds max_cycle_duration)
        : context_(ctx), socket_(ctx, asio::ip::udp::endpoint(asio::ip::udp::v4(), 0)), buffers_(concurrency),
          send_times_(concurrency), max_cycle_duration_(max_cycle_duration) {

        // Resolve the server address
        asio::ip::udp::resolver resolver(context_);
        asio::ip::udp::resolver::results_type endpoints = resolver.resolve(server_addr, std::to_string(port));
        server_ = *endpoints.begin(); // Take the first resolved endpoint

        // Fill each buffer with some pattern (just 'X's, for example)
        for (auto &buf : buffers_) buf.fill('X');
    }

    rpc_batch_result operator()() { return one_batch(); }

  private:
    rpc_batch_result one_batch() {
        rpc_batch_result result;

        // For per-operation cancellations we could use the `asio::cancellation_signal`,
        // but this is the simple lucky case when we only want to cancel all the outstanding
        // transfers at once.
        std::atomic<std::size_t> remaining = 0;
        for (std::size_t job = 0; job < buffers_.size(); ++job, ++remaining) {
            send_times_[job] = std::chrono::steady_clock::now();
            auto finalize = [this, job, &result, &remaining](std::error_code error, std::size_t) {
                remaining--;
                if (error) return;

                // Measure latency
                auto response_time = std::chrono::steady_clock::now();
                auto diff = response_time - send_times_[job];
                result.batch_latency += diff;
                result.max_packet_latency = std::max(result.max_packet_latency, diff);
                result.received_packets++;
            };
            auto receive = [this, job, finalize, &remaining](std::error_code error, std::size_t bytes) {
                if (error) { remaining--; }
                else { socket_.async_receive_from(asio::buffer(buffers_[job], bytes), server_, finalize); }
            };
            socket_.async_send_to(asio::buffer(buffers_[job]), server_, receive);
            result.sent_packets++;
        }
        std::chrono::steady_clock::time_point expiry = std::chrono::steady_clock::now() + max_cycle_duration_;
        asio::steady_timer timer(context_, expiry);
        timer.wait();
        if (remaining) socket_.cancel(); // Forcibly abort all ops on this socket
        return result;
    }
};

ashvardanian avatar Jan 25 '25 21:01 ashvardanian