kafka-protocol-rs
kafka-protocol-rs copied to clipboard
Client examples
I'm interested in utilizing this crate but I'm curious if there are any other existing crates building out higher-level client functionality? It would be awesome if there were some examples showing how to make requests and decode the responses.
hi @scrogson, you can check it out thought it still has a lot of bugs.
kafkas - async kafka client for rust
@iamazy excellent, I'll be sure to keep my eyes on it.
Here's some other (half-baked) examples from projects I'm fiddling on: https://github.com/tychedelia/akademie/blob/master/src/kafka/client.rs https://github.com/tychedelia/josefine/blob/main/src/kafka/codec.rs
Started a crate using tower (very first pass): https://github.com/tychedelia/tower-kafka
I have after some trial and error come up with a basic encode, send, receive, decode that works, if no higher than version 8. Though id share, as the documentation does not seem to cover the details around sending the request and decoding the response.
use bytes::BufMut;
use bytes::Bytes;
use bytes::BytesMut;
use kafka_protocol::indexmap::IndexMap;
use kafka_protocol::messages::ApiKey;
use kafka_protocol::messages::ProduceRequest;
use kafka_protocol::messages::ProduceResponse;
use kafka_protocol::messages::RequestHeader;
use kafka_protocol::messages::TopicName;
use kafka_protocol::messages::produce_request::PartitionProduceData;
use kafka_protocol::messages::produce_request::TopicProduceData;
use kafka_protocol::protocol::Builder;
use kafka_protocol::protocol::Decodable;
use kafka_protocol::protocol::Encodable;
use kafka_protocol::protocol::HeaderVersion;
use kafka_protocol::protocol::StrBytes;
use kafka_protocol::records::Compression;
use kafka_protocol::records::Record;
use kafka_protocol::records::RecordBatchEncoder;
use kafka_protocol::records::RecordEncodeOptions;
use kafka_protocol::records::TimestampType;
use rand::random;
use std::io::Read;
use std::io::Write;
use std::net::TcpStream;
use std::time::SystemTime;
const BROKER : &str = "localhost:9092";
const TOPIC : &str = "test-events";
const CLIENT_ID: &str = "client";
const MESSAGE : &str = "test data";
pub fn produce() {
//-------------------------------------------------------------------------
// Assemble Request
//-------------------------------------------------------------------------
println!("-- Assembling request --");
let version_body = 8; // 9 gives me decode error: CompactArray length is negative (-1)
let version_header = ProduceRequest::header_version(version_body);
let req_id = random::<i32>();
let header = RequestHeader::builder()
.client_id(Some(StrBytes::from_str(CLIENT_ID)))
.request_api_key(ApiKey::ProduceKey as i16)
.request_api_version(version_body)
.correlation_id(req_id)
.build().unwrap();
let body = ProduceRequest::builder()
.acks(1)
.timeout_ms(1_000)
.topic_data({
let record = Record {
transactional: false,
control: false,
partition_leader_epoch: 0,
producer_id: -1,
producer_epoch: -1,
timestamp_type: TimestampType::Creation,
offset: 0,
sequence: 0,
timestamp: SystemTime::now().duration_since(SystemTime::UNIX_EPOCH).unwrap().as_secs() as i64,
key: None,
value: Some(Bytes::from(MESSAGE)),
headers: IndexMap::new(),
};
let mut bytes = BytesMut::new();
RecordBatchEncoder::encode(&mut bytes, [record].iter(), &RecordEncodeOptions{
version: 2,
compression: Compression::None,
}).unwrap();
let entry = PartitionProduceData::builder()
.index(0)
.records(Some(Bytes::from(bytes)))
.build().unwrap();
let topic = TopicProduceData::builder()
.partition_data(vec![entry])
.build().unwrap();
let mut map = IndexMap::new();
map.insert(TopicName(StrBytes::from_str(TOPIC)), topic);
map
})
.build().unwrap();
dbg!(&header);
dbg!(&body);
//-------------------------------------------------------------------------
// Encode Request
//-------------------------------------------------------------------------
// First in the request is the length of data
// Then comes the header
// Then comes the body
println!("-- Encoding request --");
let req_len = {
let h = header.compute_size(version_header).unwrap() as i32;
let b = body.compute_size(version_body).unwrap() as i32;
(h + b).to_be_bytes()
};
let mut req_bytes = BytesMut::new();
req_bytes.put(req_len.as_ref());
header.encode(&mut req_bytes, version_header).unwrap();
body.encode(&mut req_bytes, version_body).unwrap();
//-------------------------------------------------------------------------
// Send Request
//-------------------------------------------------------------------------
println!("-- Sending request --");
println!("bytes: {}", req_bytes.len());
let mut tcp = TcpStream::connect(BROKER).unwrap();
tcp.write_all(&mut req_bytes).unwrap();
tcp.flush().unwrap();
//-------------------------------------------------------------------------
// Receive Response
//-------------------------------------------------------------------------
// First 4 bytes are the response length
println!("-- Awaiting response --");
let res_len = {
let mut len_buf = [0u8; 4];
tcp.read_exact(&mut len_buf).unwrap();
i32::from_be_bytes(len_buf) as usize
};
println!("bytes: {}", res_len);
let res_bytes = {
let mut res_bytes = vec![0u8; res_len];
tcp.read_exact(&mut res_bytes).unwrap();
res_bytes
};
//-------------------------------------------------------------------------
// Decode Response
//-------------------------------------------------------------------------
// Next 4 bytes is correlation id
// The rest is to be decoded by matching response struct
println!("-- Decoding response --");
let res_id = i32::from_be_bytes(res_bytes[0..4].try_into().unwrap());
println!("correlated: {}", req_id == res_id);
let data = ProduceResponse::decode(&mut Bytes::from(res_bytes[4..].to_vec()), version_body).unwrap();
dbg!(data);
}