implement connection pool
Can be used in the static state, from start or in a wrapper state, and is used to request or create connections based on:
- host
- http version
Option<UA>- secure (bool)
Internal implementation doesn't have to be the best, especially given these pools are short lived.
First step is to probably study the one of hyper 0.x and see what lessons we can learn from it.
Moving this to 0.3, as I don't think I really need it for now. We can think about it in 0.3
Moving this back to 0.2. Will be combined in the client layer. Pooling is done by checking if conn/dispatch is ready, instead of keeping a connection within a the layer, it will request a ready one.
ChillFish8 at tokio's discord on the #hyper channel offered a basic design for such a pool:
/// A dispatcher that gives active upstream connections to clients
/// that need it.
///
/// Connections are provided on a [QueueMode] priority.
pub struct ConnectionPoolDispatcher<C> {
mode: QueueMode,
notify: Arc<Notify>,
available: flume::Receiver<C>,
returner: flume::Sender<C>,
}
impl<C> Clone for ConnectionPoolDispatcher<C> {
fn clone(&self) -> Self {
Self {
mode: self.mode,
notify: self.notify.clone(),
available: self.available.clone(),
returner: self.returner.clone(),
}
}
}
impl<C: Connection> ConnectionPoolDispatcher<C> {
/// Create a new pool dispatcher with the given [QueueMode].
pub fn new(mode: QueueMode) -> Self {
let (tx, rx) = flume::unbounded();
Self {
mode,
notify: Arc::new(Notify::new()),
available: rx,
returner: tx,
}
}
/// Acquire a connection from the pool.
///
/// If no connection is immediately available, this will yield
/// until a new connection is available.
pub async fn acquire(&self) -> C {
loop {
while let Ok(conn) = self.available.try_recv() {
if !conn.is_closed() {
return conn;
}
}
self.notify.notified().await;
let conn = self.available.recv_async().await.unwrap();
if !conn.is_closed() {
return conn;
}
}
}
/// Release the held connection back to the pool.
pub fn release(&self, conn: C) {
if self.returner.try_send(conn).is_ok() {
match self.mode {
QueueMode::Lifo => self.notify.notify_last(),
QueueMode::Fifo => self.notify.notify_one(),
}
}
}
}
It can obviously not be taken as-is, but it might give an idea that this initial pool doesn't have to be that complex.
Thinking of it I think:
- by default the http client connector should be compatible with:
- no pool (current)
- our default pool impl
- a custom pool impl
That would allow the connector to be used even if one desires their own pool or no pool.
The conn is to be dropped when is_closed() == true and not be eligble for choice if is_ready() == false.
First task in order to create this issue might be to abstract away the send_request functionality:
pub fn send_request(
&self,
req: Request<B>,
) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
let sent = self.dispatch.send(req);
behind a trait RequestSender. Which we would implement for the SendRequest types of rama-http-core.
This would also allows us to abstract away the pool as the same trait that can work with this type, and which pools the connections as such :)
Pool might be saner to not have that trait though.
If no connection is available a new one must be created using the establish connector.
To make it all work with state, it might make sense to have this pool be some Arc<dyn trait> which we have in our Context. That would make it even easier. Would mean however that pool needs to be object safe.