redis-protocol.rs
redis-protocol.rs copied to clipboard
cannot used in tokio?
I write code as this, but it got error as the follow, so I want to known how use this crate in tokio ?
use crate::com::CODE_PORT_IN_USE;
use std::net::SocketAddr;
use std::process;
use tokio::net::TcpStream;
use tracing::{error, info, instrument, warn};
use crate::com::{create_reuse_port_listener, ClusterConfig};
use futures::{SinkExt, StreamExt};
use redis_protocol::{
codec::{resp3_encode_command, Resp3},
resp3::types::{BytesFrame, Resp3Frame, RespVersion},
};
use tokio_util::codec::{Framed, Decoder, Encoder};
#[instrument]
pub async fn handle_front_conn(socket: TcpStream) {
if let Err(err) = socket.set_nodelay(true) {
warn!("cluster fail to set nodelay but skip, due to {:?}",err);
}
let mut client_framed = Framed::new(socket, Resp3::default());
while let Some(result) = client_framed.next().await {
match result {
Ok(request) => {
info!("cluster receive request: {:?}", request)
},
Err(e) => {
eprintln!("Request decode error: {:?}", e);
break;
}
}
}
todo!()
}
#[instrument]
pub async fn start(cc: ClusterConfig) -> anyhow::Result<()> {
let addr = cc.listen_addr.parse::<SocketAddr>().expect("parse socket never fail");
let listener = match create_reuse_port_listener(&addr).await {
Ok(v) => v,
Err(e) => {
error!("listen {} error: {}", addr, e);
process::exit(CODE_PORT_IN_USE);
}
};
info!("Listening on {}", addr);
loop {
let (socket, addr) = listener.accept().await?;
info!("Accepted connection from {}", addr);
tokio::spawn(async move {
handle_front_conn(socket).await;
});
}
}
error[E0599]: the method `next` exists for struct `Framed<TcpStream, Resp3>`, but its trait bounds were not satisfied
--> src\proxy2\standalone\mod.rs:21:44
|
21 | while let Some(result) = client_framed.next().await {
| ^^^^ method cannot be called on `Framed<TcpStream, Resp3>` due to unsatisfied trait bounds
|
::: C:\Users\codeg\.cargo\registry\src\rsproxy.cn-0dccff568467c15b\tokio-util-0.6.10\src\codec\framed.rs:16:1
|
16 | / pin_project! {
17 | | /// A unified [`Stream`] and [`Sink`] interface to an underlying I/O object, using
18 | | /// the `Encoder` and `Decoder` traits to encode and decode frames.
19 | | ///
... |
30 | | }
31 | | }
| |_- doesn't satisfy `_: StreamExt` or `_: Stream`
|
= note: the following trait bounds were not satisfied:
`tokio_util::codec::Framed<tokio::net::TcpStream, Resp3>: Stream`
which is required by `tokio_util::codec::Framed<tokio::net::TcpStream, Resp3>: StreamExt`
Hi @Praying. The codec interface should work with Tokio, but from this code it's not obvious what the issue is. I won't be able to reproduce this due to the references to your other crate types though.
Here's an example of the tests that use Tokio: https://github.com/aembke/redis-protocol.rs/blob/main/tests/codec/mod.rs
@aembke It seems works with
tokio-util = { version = "0.7.12", features = ["codec"] }
but not work with
tokio-util = { version = "0.6", features = ["codec"] }