kafka-protocol-rs icon indicating copy to clipboard operation
kafka-protocol-rs copied to clipboard

Client examples

Open scrogson opened this issue 2 years ago • 5 comments

I'm interested in utilizing this crate but I'm curious if there are any other existing crates building out higher-level client functionality? It would be awesome if there were some examples showing how to make requests and decode the responses.

scrogson avatar Dec 14 '22 00:12 scrogson

hi @scrogson, you can check it out thought it still has a lot of bugs.

kafkas - async kafka client for rust

iamazy avatar Dec 19 '22 05:12 iamazy

@iamazy excellent, I'll be sure to keep my eyes on it.

scrogson avatar Dec 19 '22 22:12 scrogson

Here's some other (half-baked) examples from projects I'm fiddling on: https://github.com/tychedelia/akademie/blob/master/src/kafka/client.rs https://github.com/tychedelia/josefine/blob/main/src/kafka/codec.rs

tychedelia avatar Jan 17 '23 16:01 tychedelia

Started a crate using tower (very first pass): https://github.com/tychedelia/tower-kafka

tychedelia avatar Jan 20 '23 22:01 tychedelia

I have after some trial and error come up with a basic encode, send, receive, decode that works, if no higher than version 8. Though id share, as the documentation does not seem to cover the details around sending the request and decoding the response.

use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use kafka_protocol::indexmap::IndexMap;
use kafka_protocol::messages::ApiKey;
use kafka_protocol::messages::ProduceRequest;
use kafka_protocol::messages::ProduceResponse;
use kafka_protocol::messages::RequestHeader;
use kafka_protocol::messages::TopicName;
use kafka_protocol::messages::produce_request::PartitionProduceData;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::protocol::Builder;
use kafka_protocol::protocol::Decodable;
use kafka_protocol::protocol::Encodable;
use kafka_protocol::protocol::HeaderVersion;
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::records::Compression;
use kafka_protocol::records::Record;
use kafka_protocol::records::RecordBatchEncoder;
use kafka_protocol::records::RecordEncodeOptions;
use kafka_protocol::records::TimestampType;
use rand::random;
use std::io::Read;
use std::io::Write;
use std::net::TcpStream;
use std::time::SystemTime;

const BROKER   : &str = "localhost:9092";
const TOPIC    : &str = "test-events";
const CLIENT_ID: &str = "client";
const MESSAGE  : &str = "test data";

pub fn produce() {
    //-------------------------------------------------------------------------
    //  Assemble Request
    //-------------------------------------------------------------------------

    println!("-- Assembling request --");

    let version_body = 8; // 9 gives me decode error: CompactArray length is negative (-1)
    let version_header = ProduceRequest::header_version(version_body);
    let req_id = random::<i32>();

    let header = RequestHeader::builder()
        .client_id(Some(StrBytes::from_str(CLIENT_ID)))
        .request_api_key(ApiKey::ProduceKey as i16)
        .request_api_version(version_body)
        .correlation_id(req_id)
        .build().unwrap();

    let body = ProduceRequest::builder()
        .acks(1)
        .timeout_ms(1_000)
        .topic_data({
            let record = Record {
                transactional: false,
                control: false,
                partition_leader_epoch: 0,
                producer_id: -1,
                producer_epoch: -1,
                timestamp_type: TimestampType::Creation,
                offset: 0,
                sequence: 0,
                timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as i64,
                key: None,
                value: Some(Bytes::from(MESSAGE)),
                headers: IndexMap::new(),
            };

            let mut bytes = BytesMut::new();
            RecordBatchEncoder::encode(&mut bytes, [record].iter(), &RecordEncodeOptions{
                version: 2,
                compression: Compression::None,
            }).unwrap();

            let entry = PartitionProduceData::builder()
                .index(0)
                .records(Some(Bytes::from(bytes)))
                .build().unwrap();
            let topic = TopicProduceData::builder()
                .partition_data(vec![entry])
                .build().unwrap();
            let mut map = IndexMap::new();
            map.insert(TopicName(StrBytes::from_str(TOPIC)), topic);
            map
        })
        .build().unwrap();

    dbg!(&header);
    dbg!(&body);

    //-------------------------------------------------------------------------
    //  Encode Request
    //-------------------------------------------------------------------------
    // First in the request is the length of data
    // Then comes the header
    // Then comes the body

    println!("-- Encoding request --");

    let req_len = {
        let h = header.compute_size(version_header).unwrap() as i32;
        let b = body.compute_size(version_body).unwrap() as i32;
        (h + b).to_be_bytes()
    };

    let mut req_bytes = BytesMut::new();
    req_bytes.put(req_len.as_ref());
    header.encode(&mut req_bytes, version_header).unwrap();
    body.encode(&mut req_bytes, version_body).unwrap();

    //-------------------------------------------------------------------------
    //  Send Request
    //-------------------------------------------------------------------------

    println!("-- Sending request --");
    println!("bytes: {}", req_bytes.len());

    let mut tcp = TcpStream::connect(BROKER).unwrap();
    tcp.write_all(&mut req_bytes).unwrap();
    tcp.flush().unwrap();

    //-------------------------------------------------------------------------
    //  Receive Response
    //-------------------------------------------------------------------------
    // First 4 bytes are the response length

    println!("-- Awaiting response --");

    let res_len = {
        let mut len_buf = [0u8; 4];
        tcp.read_exact(&mut len_buf).unwrap();
        i32::from_be_bytes(len_buf) as usize
    };

    println!("bytes: {}", res_len);

    let res_bytes = {
        let mut res_bytes = vec![0u8; res_len];
        tcp.read_exact(&mut res_bytes).unwrap();
        res_bytes
    };

    //-------------------------------------------------------------------------
    //  Decode Response
    //-------------------------------------------------------------------------
    // Next 4 bytes is correlation id
    // The rest is to be decoded by matching response struct

    println!("-- Decoding response --");

    let res_id = i32::from_be_bytes(res_bytes[0..4].try_into().unwrap());
    println!("correlated: {}", req_id == res_id);

    let data = ProduceResponse::decode(&mut Bytes::from(res_bytes[4..].to_vec()), version_body).unwrap();
    dbg!(data);
}

zerfix avatar Feb 07 '24 10:02 zerfix