grpc-rs icon indicating copy to clipboard operation
grpc-rs copied to clipboard

Threads / workers seem to be blocking each other

Open andrewcsmith opened this issue 6 years ago • 10 comments

(question cross-posted to reddit)

Test repo

I can't seem to work out how to use gRPC (using the grpcio crate, grpc-rs). I have a dummy server that just sleeps for a bit and then generates a random number and sends it back, but it seems that the routine is blocking the whole server. I was originally under the impression that Environment::new(8) generated a thread pool of 8 threads, and that it performed some sort of work-stealing routing to avoid blocking. But maybe I have to organize my own thread pool myself?

Essentially, my server is very simple (full source at the link above):

fn spread_chaos(&self, ctx: RpcContext, req: ChaosRequest, sink: UnarySink<ChaosResponse>) {
    println!("received a thing");
    // TODO: There's no guarantee this won't collide
    thread::sleep_ms(1000);
    let id = rand::thread_rng().gen::<u64>();
    let mut resp = ChaosResponse::new();
    resp.set_id(id);
    let f = sink.success(resp)
        .map_err(move |e| println!("failed to reply {:?}: {:?}", req, e));
    ctx.spawn(f);
}

That just prints a thing, sleeps for a bit, and then returns. My main function looks very similar to the example:

fn main() {
    let env = Arc::new(Environment::new(8));
    let service = create_chaos(ChaosService);
    let mut server = ServerBuilder::new(env)
        .register_service(service)
        .bind("127.0.0.1", 50051)
        .build()
        .expect("Could not create server");
    server.start();

    for &(ref host, port) in server.bind_addrs() {
        println!("listening on {}:{}", host, port);
    }

    let (tx, rx) = oneshot::channel();

    thread::spawn(move || {
        println!("press ENTER to exit...");
        let _ = io::stdin().read(&mut [0]).unwrap();
        tx.send(())
    });

    let _ = rx.wait();
    let _ = server.shutdown().wait();
}

andrewcsmith avatar May 10 '18 02:05 andrewcsmith

PTAL @BusyJay @overvenus

siddontang avatar May 10 '18 19:05 siddontang

That's because you are using the same channel, aka connection, to send a request. When you do heavy things in service method, that means both the connection and thread are block. So all following messages are queued and wait.

BusyJay avatar May 24 '18 02:05 BusyJay

Turns out I'm wrong. There is an implementation bug in grpcio.

BusyJay avatar May 24 '18 06:05 BusyJay

Created an issue in the upstream repository: grpc/grpc#15535. To solve your problem quickly, it's better to spawn the time consuming job into thread pool instead of executing them inside service callback.

BusyJay avatar May 25 '18 04:05 BusyJay

@BusyJay can you give an example of how to spawn the time consuming job (say sleeping 10 seconds) into the threadpool?

seansu4you87 avatar Apr 16 '19 00:04 seansu4you87

For arbitrary thread pool, you can bridge them with a channel, like

let (tx, rx) = ::futures_channel::oneshot();
thread::spawn(move || {
   sleep(10);
   tx.send(());
});
ctx.spawn(rx.and_then(move |()| sink.success(resp)));

Some threadpools like futures cpupool or tokio-threadpool provide handy API that returns a future when spawn the job, you can see their documentation for more example and details.

BusyJay avatar Apr 16 '19 03:04 BusyJay

amazing, thank you 🙏!

seansu4you87 avatar Apr 18 '19 02:04 seansu4you87

For arbitrary thread pool, you can bridge them with a channel, like

let (tx, rx) = ::futures_channel::oneshot();
thread::spawn(move || {
   sleep(10);
   tx.send(());
});
ctx.spawn(rx.and_then(move |()| sink.success(resp)));

Some threadpools like futures cpupool or tokio-threadpool provide handy API that returns a future when spawn the job, you can see their documentation for more example and details.

@BusyJay 您好,我也遇到了相同的问题,使用C++版本的grpc是直接支持并发请求的,但是我在尝试使用grpc-rs的时候,却发现并发过来的请求只会一直使用同一条线程,尽管我使用Environment::new(10)设置了多线程,请问您上面提供的这段代码应该如何使用?我尝试在代码中加入上面的代码,却无法编译通过,很抱歉我是一个rust新手

gateslu avatar Oct 13 '19 13:10 gateslu

...使用C++版本的grpc是直接支持并发请求的..

@gateslu can you show me a C++ version that supports concurrency and works different than the rust version?

The code is just an example to show how to offload the blocking code from grpc threads. There is no differences between it and general usage of thread pool. You can refer the documentation of threadpools for more explanations.

BusyJay avatar Oct 14 '19 01:10 BusyJay

...使用C++版本的grpc是直接支持并发请求的..

@gateslu can you show me a C++ version that supports concurrency and works different than the rust version?

The code is just an example to show how to offload the blocking code from grpc threads. There is no differences between it and general usage of thread pool. You can refer the documentation of threadpools for more explanations.

Thank you so much, I think I've solved the problem : )

    let mut _content = String::from(req.get_content());

    let (tx, rx) = oneshot::channel();
    thread::spawn(move || {

        let mut resp = TaskResponse::default();

        let msg = format!("Hello {} {:?}", _content, thread::current().id());

        let content = String::from(msg);
        resp.set_content(content);

        let _ = tx.send(resp);
    });

    let _f = rx.and_then(move |resp| {
        sink.success(resp);
        return Ok(());
    });
    ctx.spawn(_f.map_err(move |e| { println!("failed to reply {:?}: {:?}", req, e) }));

gateslu avatar Oct 14 '19 06:10 gateslu