capnproto-rust icon indicating copy to clipboard operation
capnproto-rust copied to clipboard

How to run a RPC server in a threadpool instead of current_thread?

Open zhengyi-yang opened this issue 6 years ago • 6 comments

I've set up a RPC server using capnp_rpc following the examples given in the repo. The code looks like:

        let addr = ([0, 0, 0, 0], port).into();
        let socket = tokio::net::TcpListener::bind(&addr).unwrap();

        let client = simple_capnp::simple::ToClient::new(self).into_client::<capnp_rpc::Server>();

        let done = socket.incoming().for_each(move |socket| {
            socket.set_nodelay(true)?;
            let (reader, writer) = socket.split();

            let network = twoparty::VatNetwork::new(
                reader,
                std::io::BufWriter::new(writer),
                rpc_twoparty_capnp::Side::Server,
                Default::default(),
            );

            let rpc_system = RpcSystem::new(Box::new(network), Some(client.clone().client));
            current_thread::spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
            Ok(())
        });

        current_thread::block_on_all(done).unwrap();

However, when I run the server in a distributed environment with many incoming requests, a low cpu usage was observed and the throughput is 20% lower then tarpc. I'm assuming that the server only uses one thread, so I tried to run the server in a tokio::runtime::Runtime runtime which support multi-threading. But as RpcSystem doesn't support Send, I got the following errors:

error[E0277]: `std::rc::Rc<std::cell::RefCell<std::option::Option<std::rc::Rc<capnp_rpc::rpc::ConnectionState<capnp_rpc::rpc_twoparty_capnp::Side>>>>>` cannot be sent between threads safely
  --> src/graph_impl/rpc_graph/server.rs:49:21
   |
49 |             runtime.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
   |                     ^^^^^ `std::rc::Rc<std::cell::RefCell<std::option::Option<std::rc::Rc<capnp_rpc::rpc::ConnectionState<capnp_rpc::rpc_twoparty_capnp::Side>>>>>` cannot be sent between threads safely
   |

error[E0277]: `std::rc::Rc<std::cell::RefCell<std::boxed::Box<(dyn capnp_rpc::task_set::TaskReaper<(), capnp::Error> + 'static)>>>` cannot be sent between threads safely
  --> src/graph_impl/rpc_graph/server.rs:49:21
   |
49 |             runtime.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
   |                     ^^^^^ `std::rc::Rc<std::cell::RefCell<std::boxed::Box<(dyn capnp_rpc::task_set::TaskReaper<(), capnp::Error> + 'static)>>>` cannot be sent between threads safely
   |

error[E0277]: `(dyn capnp_rpc::VatNetwork<capnp_rpc::rpc_twoparty_capnp::Side> + 'static)` cannot be sent between threads safely
  --> src/graph_impl/rpc_graph/server.rs:49:21
   |
49 |             runtime.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
   |                     ^^^^^ `(dyn capnp_rpc::VatNetwork<capnp_rpc::rpc_twoparty_capnp::Side> + 'static)` cannot be sent between threads safely
   |

error[E0277]: `(dyn capnp::private::capability::ClientHook + 'static)` cannot be sent between threads safely
  --> src/graph_impl/rpc_graph/server.rs:49:21
   |
49 |             runtime.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
   |                     ^^^^^ `(dyn capnp::private::capability::ClientHook + 'static)` cannot be sent between threads safely
   |

error[E0277]: `std::rc::Weak<std::cell::RefCell<futures::unsync::mpsc::Shared<capnp_rpc::task_set::EnqueuedTask<(), capnp::Error>>>>` cannot be sent between threads safely
  --> src/graph_impl/rpc_graph/server.rs:49:21
   |
49 |             runtime.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
   |                     ^^^^^ `std::rc::Weak<std::cell::RefCell<futures::unsync::mpsc::Shared<capnp_rpc::task_set::EnqueuedTask<(), capnp::Error>>>>` cannot be sent between threads safely
   |

error[E0277]: `std::rc::Rc<std::cell::RefCell<futures::unsync::mpsc::Shared<capnp_rpc::task_set::EnqueuedTask<(), capnp::Error>>>>` cannot be sent between threads safely
  --> src/graph_impl/rpc_graph/server.rs:49:21
   |
49 |             runtime.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
   |                     ^^^^^ `std::rc::Rc<std::cell::RefCell<futures::unsync::mpsc::Shared<capnp_rpc::task_set::EnqueuedTask<(), capnp::Error>>>>` cannot be sent between threads safely
   |

error[E0277]: `(dyn futures::future::Future<Item = (), Error = ()> + 'static)` cannot be sent between threads safely
  --> src/graph_impl/rpc_graph/server.rs:49:21
   |
49 |             runtime.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
   |                     ^^^^^ `(dyn futures::future::Future<Item = (), Error = ()> + 'static)` cannot be sent between threads safely
   |

error[E0277]: `(dyn futures::future::Future<Item = (), Error = capnp::Error> + 'static)` cannot be sent between threads safely
   |
49 |             runtime.spawn(rpc_system.map_err(|e| println!("error: {:?}", e)));
   |                     ^^^^^ `(dyn futures::future::Future<Item = (), Error = capnp::Error> + 'static)` cannot be sent between threads safely
   |

Is there an efficient way to run the server to handle large amount of requests?

zhengyi-yang avatar Aug 20 '19 08:08 zhengyi-yang

Do your RPC methods perform any blocking I/O? If so, you may want to try converting that to async I/O. Or you offload that I/O to a threadpool shared among your RPC objects.

Alternatively: it looks to me like your main barrier to using tokio::runtime::Runtime is not that RpcSystem needs to be Send, but that you would like to share your simple_capnp::simple RPC object among many RpcSystems. Does your object have any state that needs to be shared between incoming connections? If not, you create a new object for each incoming connection. If it does share state, consider rigging up some way to share that state somewhere other than directly in the object itself.

dwrensha avatar Aug 20 '19 12:08 dwrensha

your main barrier to using tokio::runtime::Runtime is not that RpcSystem needs to be Send

Oh, right, but tokio::runtime::Runtime still would require RpcSystem to be send. I guess what I really had in mind here was launching a new thread for each incoming connection, or possibly finding a way to load balance incoming requests among some fixed set of threads, each with its own simple_capnp::simple object.

dwrensha avatar Aug 20 '19 12:08 dwrensha

Thanks @dwrensha. I'll have a try. The simple_capnp::simple object in my program only have one field wrapped in Arc, so clone should not be a problem. Talking back to performance, I just tested my application using capnp_rpc and tarpc both running in a single threaded server and using blocking io. capnp_rpc is still about 20% slower than tarpc. Therefore the bottleneck is likely not because of the single thread. I understand the design goal are a bit different, but it'll be interesting to do a detailed benchmark and see why this happens.

zhengyi-yang avatar Aug 20 '19 15:08 zhengyi-yang

I would be definitely interested in seeing any benchmarking results and analysis.

An observation of "20% slower than tarpc" would not surprise me too much. The RPC part of capnproto-rust is not super optimized at the moment. I would guess that we spend a lot of time allocating memory during all of the juggling of Future objects. Possibly a lot of that could be moved more to stack allocation, especially once we can use async.

dwrensha avatar Aug 20 '19 21:08 dwrensha

I've been doing my own sort of inquery into if capnp-rpc is right for me for a product, and noticed the same thing really.

I took a quick look with callgrind, profiling of a little echo rpc client/server, ~15% of time is spent in capnp::message::HeapAllocator for the message builder it looks like. That seems to match up with the comments here.

Screenshot from 2020-01-17 14-44-01

teburd avatar Jan 17 '20 20:01 teburd

Is there any improvement on rpc?

photoszzt avatar Oct 02 '20 15:10 photoszzt