tower
tower copied to clipboard
Type erased sync Service
Hey,
I was wondering if we could add a SyncService<Req, Res, Err> similar to BoxCloneService<Req, Res, Err> which would additionally implement Sync to the util module? Or is this already easily possible without introducing a new type?
I use an implementation in my own project that looks like the code snippet below, it is basically a naive implementation of a type erased version of an unbounded tower::buffer::Buffer.
use std::sync::Arc;
use std::task::Poll;
use tokio::sync::mpsc;
use tokio::sync::{self, Notify};
use tower::{Service, ServiceExt};
struct Message<Req, Res, Err> {
request: Req,
tx: sync::oneshot::Sender<Result<Res, Err>>,
notify: Arc<Notify>,
}
#[derive(Debug)]
pub struct SyncService<Req, Res, Err> {
tx: mpsc::UnboundedSender<Message<Req, Res, Err>>,
}
impl<Req, Res, Err> Clone for SyncService<Req, Res, Err> {
fn clone(&self) -> Self {
Self {
tx: self.tx.clone(),
}
}
}
impl<Req, Res, Err> SyncService<Req, Res, Err> {
pub fn new<T>(service: T) -> Self
where
T: Service<Req, Response = Res, Error = Err>,
T: Send + Clone + 'static,
T::Future: Send,
Req: Send + 'static,
Res: Send + 'static,
Err: Send + 'static,
{
let (tx, mut rx) = mpsc::unbounded_channel::<Message<Req, Res, Err>>();
tokio::spawn(async move {
while let Some(message) = rx.recv().await {
let svc = service.clone();
let resp = svc.oneshot(message.request).await;
let _ = message.tx.send(resp);
message.notify.notify_one();
}
});
let buffer = Self { tx };
buffer
}
}
impl<Req, Res, Err> Service<Req> for SyncService<Req, Res, Err>
where
Req: Send + 'static,
Res: Send + 'static,
Err: Send + 'static,
{
type Response = Res;
type Error = Err;
type Future = BoxFuture<'static, Result<Res, Err>>;
fn poll_ready(
&mut self,
_cx: &mut std::task::Context<'_>,
) -> std::task::Poll<Result<(), Self::Error>> {
Poll::Ready(Ok(()))
}
fn call(&mut self, req: Req) -> Self::Future {
let this = self.clone();
Box::pin(async move {
let (tx, mut rx) = sync::oneshot::channel();
let notify = Arc::new(Notify::new());
let _ = this.tx.send(Message {
request: req,
tx,
notify: notify.clone(),
});
notify.notified().await;
rx.try_recv().unwrap()
})
}
}
With the SyncService we can make the code below compile where we need to store a type that implements tower::Service on a struct that needs to implement Sync . I am not sure how I would make this work with the existing types in the tower crate.
use hyper::{Request, Response, Body};
use std::convert::Infallible;
trait Backend: Send + Sync {
// with some methods
}
struct PostgresBackend(SyncService<Request<Body>, Response<Body>, Infallible>);
impl Backend for PostgresBackend {}
I would be happy to open a PR with a more serious implementation if you see the value for it.
I wonder if you could use this https://docs.rs/sync_wrapper/0.1.1/sync_wrapper/struct.SyncWrapper.html
It almost works for my use-case, unfortunately it does not implement Clone which is needed to call the inner service when I only have an immutable reference:
impl Backend for PostgresBackend {
async fn handle(&self, req: Request) -> Response {
// Can not call inner `Service` with a immutable reference, so need to clone
let svc = self.0.clone();
svc.oneshot().await
}
}
I guess one can add Clone to sync_wrapper, but in my case it would be a bit expensive to Clone the inner service on every request. With the channel based implementation one only needs to clone the sender handle of the channel which I guess should be much less expensive.
To convert from a &self anyways for a service requires a clone so I don't think you can skip out on that.
No doubt about that. But I think one can still avoid the clone of the inner service with the channel based implementation of the outer service where the outer service is cheap to clone.
Right so if you do SyncWrapper<BoxCloneService> where buffer is the outer type in the boxed type the clone is cheap and the type is cloneable cheaply?
Right so if you do SyncWrapper<BoxCloneService> where buffer is the outer type in the boxed type the clone is cheap and the type is cloneable cheaply?
Not sure if I understood you correctly, but the buffer outer type would give cheap clones as it doesn't need to clone the inner services. Only "problem" with the current Buffer for my use-case is that it doesn't erase the inner service type, which makes things a bit awkward when I need to store the service on a struct and need the exact type.
What I am trying to achieve is a wrapper Service with the following behaviour:
Syncimplementation if the inner service isSync. (Allows us to store the service on a type that implementsSync)Cloneimplementation even if inner service does not implementClone. (Allows us to cheaply call the service with only an immutable reference).- Is type erased similar to
BoxService, only containing the request, response and error generic types. (Useful when we store the service on a struct and need to specify the exact type which is awkward if it contains closures and so on).
SyncWrapper<BoxCloneService> solves the first and last point, but not the second as it needs to clone the inner service on every request. In my case the inner service usually contains a bunch of things and is not cheaply cloneable so it would be nice to get around that.
SyncWrapper<BoxCloneService> solves the first and last point, but not the second as it needs to clone the inner service on every request. In my case the inner service usually contains a bunch of things and is not cheaply cloneable so it would be nice to get around that.
if you use Buffer you then get a handle to a channel that is cheaply cloneable so that should solve the clone problem?
Hi, did you ever come up with a solution to getting a service that is both Clone + Sync? I'm trying to do this:
SyncWrapper::new(tower::util::BoxCloneService::new(tower_service))
But I still get this error, saying that it is not Clone:
30 | let tower_service = tower_service.compat();
| ^^^^^^^^^^^^^ ------ required by a bound introduced by this call
| |
| the trait `Clone` is not implemented for `SyncWrapper<BoxCloneService<poem::http::Request<tonic::transport::Body>, poem::http::Response<http_body::combinators::box_body::UnsyncBoxBody<tonic::codegen::Bytes, tonic::Status>>, Box<(dyn std::error::Error + Send + Sync + 'static)>>>`
Even if I make a change to make SyncWrapper Clone (https://github.com/Actyx/sync_wrapper/pull/10) it doesn't work, saying that it doesn't implement Service:
error[E0277]: the trait bound `SyncWrapper<BoxCloneService<poem::http::Request<tonic::transport::Body>, poem::http::Response<http_body::combinators::box_body::UnsyncBoxBody<tonic::codegen::Bytes, tonic::Status>>, Box<(dyn std::error::Error + Send + Sync + 'static)>>>: Service<poem::http::Request<tonic::transport::Body>>` is not satisfied
--> crates/aptos-api/v2/src/service.rs:31:25
|
31 | let tower_service = tower_service.compat();
| ^^^^^^^^^^^^^ ------ required by a bound introduced by this call
| |
| the trait `Service<poem::http::Request<tonic::transport::Body>>` is not implemented for `SyncWrapper<BoxCloneService<poem::http::Request<tonic::transport::Body>, poem::http::Response<http_body::combinators::box_body::UnsyncBoxBody<tonic::codegen::Bytes, tonic::Status>>, Box<(dyn std::error::Error + Send + Sync + 'static)>>>`
|
= help: the following other types implement trait `Service<Request>`:
<&'a mut S as Service<Request>>
<&hyper::client::client::Client<C, B> as Service<poem::http::Request<B>>>
<ApiV2Server<T> as Service<poem::http::Request<B>>>
<AsService<'_, M, Request> as Service<Target>>
<Balance<D, Req> as Service<Req>>
<Box<S> as Service<Request>>
<BoxCloneService<T, U, E> as Service<T>>
<BoxService<T, U, E> as Service<T>>
and 112 others
note: required by a bound in `poem::endpoint::TowerCompatExt::compat`
--> /Users/dport/github/poem/poem/src/endpoint/tower_compat.rs:19:15
|
19 | Self: Service<
| _______________^
20 | | http::Request<hyper::Body>,
21 | | Response = hyper::Response<ResBody>,
22 | | Error = Err,
23 | | Future = Fut,
24 | | > + Clone
| |_____________^ required by this bound in `poem::endpoint::TowerCompatExt::compat`
Doing it the other way doesn't work either:
28 | let tower_service = tower::util::BoxCloneService::new(SyncWrapper::new(tower_service));
| --------------------------------- ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ the trait `Service<_>` is not implemented for `SyncWrapper<Routes>`
| |
| required by a bound introduced by this call
|
= help: the following other types implement trait `Service<Request>`:
<&'a mut S as Service<Request>>
<&hyper::client::client::Client<C, B> as Service<poem::http::Request<B>>>
<ApiV2Server<T> as Service<poem::http::Request<B>>>
<AsService<'_, M, Request> as Service<Target>>
<Balance<D, Req> as Service<Req>>
<Box<S> as Service<Request>>
<BoxCloneService<T, U, E> as Service<T>>
<BoxService<T, U, E> as Service<T>>
and 112 others
note: required by a bound in `BoxCloneService::<T, U, E>::new`
--> /Users/dport/.cargo/registry/src/github.com-1ecc6299db9ec823/tower-0.4.13/src/util/boxed_clone.rs:69:12
|
69 | S: Service<T, Response = U, Error = E> + Clone + Send + 'static,
| ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^ required by this bound in `BoxCloneService::<T, U, E>::new`