rama icon indicating copy to clipboard operation
rama copied to clipboard

implement connection pool

Open GlenDC opened this issue 1 year ago • 2 comments

Can be used in the static state, from start or in a wrapper state, and is used to request or create connections based on:

  • host
  • http version
  • Option<UA>
  • secure (bool)

Internal implementation doesn't have to be the best, especially given these pools are short lived.

GlenDC avatar Mar 19 '24 10:03 GlenDC

First step is to probably study the one of hyper 0.x and see what lessons we can learn from it.

GlenDC avatar Apr 20 '24 19:04 GlenDC

Moving this to 0.3, as I don't think I really need it for now. We can think about it in 0.3

GlenDC avatar Aug 14 '24 21:08 GlenDC

Moving this back to 0.2. Will be combined in the client layer. Pooling is done by checking if conn/dispatch is ready, instead of keeping a connection within a the layer, it will request a ready one.

GlenDC avatar Dec 30 '24 14:12 GlenDC

ChillFish8 at tokio's discord on the #hyper channel offered a basic design for such a pool:

/// A dispatcher that gives active upstream connections to clients
/// that need it.
///
/// Connections are provided on a [QueueMode] priority.
pub struct ConnectionPoolDispatcher<C> {
    mode: QueueMode,
    notify: Arc<Notify>,
    available: flume::Receiver<C>,
    returner: flume::Sender<C>,
}

impl<C> Clone for ConnectionPoolDispatcher<C> {
    fn clone(&self) -> Self {
        Self {
            mode: self.mode,
            notify: self.notify.clone(),
            available: self.available.clone(),
            returner: self.returner.clone(),
        }
    }
}

impl<C: Connection> ConnectionPoolDispatcher<C> {
    /// Create a new pool dispatcher with the given [QueueMode].
    pub fn new(mode: QueueMode) -> Self {
        let (tx, rx) = flume::unbounded();

        Self {
            mode,
            notify: Arc::new(Notify::new()),
            available: rx,
            returner: tx,
        }
    }

    /// Acquire a connection from the pool.
    ///
    /// If no connection is immediately available, this will yield
    /// until a new connection is available.
    pub async fn acquire(&self) -> C {
        loop {
            while let Ok(conn) = self.available.try_recv() {
                if !conn.is_closed() {
                    return conn;
                }
            }

            self.notify.notified().await;

            let conn = self.available.recv_async().await.unwrap();

            if !conn.is_closed() {
                return conn;
            }
        }
    }

    /// Release the held connection back to the pool.
    pub fn release(&self, conn: C) {
        if self.returner.try_send(conn).is_ok() {
            match self.mode {
                QueueMode::Lifo => self.notify.notify_last(),
                QueueMode::Fifo => self.notify.notify_one(),
            }
        }
    }
}

It can obviously not be taken as-is, but it might give an idea that this initial pool doesn't have to be that complex.

Thinking of it I think:

  • by default the http client connector should be compatible with:
    • no pool (current)
    • our default pool impl
    • a custom pool impl

That would allow the connector to be used even if one desires their own pool or no pool. The conn is to be dropped when is_closed() == true and not be eligble for choice if is_ready() == false.

GlenDC avatar Dec 30 '24 16:12 GlenDC

First task in order to create this issue might be to abstract away the send_request functionality:

 pub fn send_request(
        &self,
        req: Request<B>,
    ) -> impl Future<Output = crate::Result<Response<IncomingBody>>> {
        let sent = self.dispatch.send(req);

behind a trait RequestSender. Which we would implement for the SendRequest types of rama-http-core. This would also allows us to abstract away the pool as the same trait that can work with this type, and which pools the connections as such :)

Pool might be saner to not have that trait though.

If no connection is available a new one must be created using the establish connector.

To make it all work with state, it might make sense to have this pool be some Arc<dyn trait> which we have in our Context. That would make it even easier. Would mean however that pool needs to be object safe.

GlenDC avatar Dec 30 '24 17:12 GlenDC