grpc-rs
grpc-rs copied to clipboard
Threads / workers seem to be blocking each other
(question cross-posted to reddit)
I can't seem to work out how to use gRPC (using the grpcio crate, grpc-rs). I have a dummy server that just sleeps for a bit and then generates a random number and sends it back, but it seems that the routine is blocking the whole server. I was originally under the impression that Environment::new(8)
generated a thread pool of 8 threads, and that it performed some sort of work-stealing routing to avoid blocking. But maybe I have to organize my own thread pool myself?
Essentially, my server is very simple (full source at the link above):
fn spread_chaos(&self, ctx: RpcContext, req: ChaosRequest, sink: UnarySink<ChaosResponse>) {
println!("received a thing");
// TODO: There's no guarantee this won't collide
thread::sleep_ms(1000);
let id = rand::thread_rng().gen::<u64>();
let mut resp = ChaosResponse::new();
resp.set_id(id);
let f = sink.success(resp)
.map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
ctx.spawn(f);
}
That just prints a thing, sleeps for a bit, and then returns. My main function looks very similar to the example:
fn main() {
let env = Arc::new(Environment::new(8));
let service = create_chaos(ChaosService);
let mut server = ServerBuilder::new(env)
.register_service(service)
.bind("127.0.0.1", 50051)
.build()
.expect("Could not create server");
server.start();
for &(ref host, port) in server.bind_addrs() {
println!("listening on {}:{}", host, port);
}
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
println!("press ENTER to exit...");
let _ = io::stdin().read(&mut [0]).unwrap();
tx.send(())
});
let _ = rx.wait();
let _ = server.shutdown().wait();
}
PTAL @BusyJay @overvenus
That's because you are using the same channel, aka connection, to send a request. When you do heavy things in service method, that means both the connection and thread are block. So all following messages are queued and wait.
Turns out I'm wrong. There is an implementation bug in grpcio.
Created an issue in the upstream repository: grpc/grpc#15535. To solve your problem quickly, it's better to spawn the time consuming job into thread pool instead of executing them inside service callback.
@BusyJay can you give an example of how to spawn the time consuming job (say sleeping 10 seconds) into the threadpool?
For arbitrary thread pool, you can bridge them with a channel, like
let (tx, rx) = ::futures_channel::oneshot();
thread::spawn(move || {
sleep(10);
tx.send(());
});
ctx.spawn(rx.and_then(move |()| sink.success(resp)));
Some threadpools like futures cpupool or tokio-threadpool provide handy API that returns a future when spawn the job, you can see their documentation for more example and details.
amazing, thank you 🙏!
For arbitrary thread pool, you can bridge them with a channel, like
let (tx, rx) = ::futures_channel::oneshot(); thread::spawn(move || { sleep(10); tx.send(()); }); ctx.spawn(rx.and_then(move |()| sink.success(resp)));
Some threadpools like futures cpupool or tokio-threadpool provide handy API that returns a future when spawn the job, you can see their documentation for more example and details.
@BusyJay 您好,我也遇到了相同的问题,使用C++版本的grpc是直接支持并发请求的,但是我在尝试使用grpc-rs的时候,却发现并发过来的请求只会一直使用同一条线程,尽管我使用Environment::new(10)设置了多线程,请问您上面提供的这段代码应该如何使用?我尝试在代码中加入上面的代码,却无法编译通过,很抱歉我是一个rust新手
...使用C++版本的grpc是直接支持并发请求的..
@gateslu can you show me a C++ version that supports concurrency and works different than the rust version?
The code is just an example to show how to offload the blocking code from grpc threads. There is no differences between it and general usage of thread pool. You can refer the documentation of threadpools for more explanations.
...使用C++版本的grpc是直接支持并发请求的..
@gateslu can you show me a C++ version that supports concurrency and works different than the rust version?
The code is just an example to show how to offload the blocking code from grpc threads. There is no differences between it and general usage of thread pool. You can refer the documentation of threadpools for more explanations.
Thank you so much, I think I've solved the problem : )
let mut _content = String::from(req.get_content());
let (tx, rx) = oneshot::channel();
thread::spawn(move || {
let mut resp = TaskResponse::default();
let msg = format!("Hello {} {:?}", _content, thread::current().id());
let content = String::from(msg);
resp.set_content(content);
let _ = tx.send(resp);
});
let _f = rx.and_then(move |resp| {
sink.success(resp);
return Ok(());
});
ctx.spawn(_f.map_err(move |e| { println!("failed to reply {:?}: {:?}", req, e) }));