pingora icon indicating copy to clipboard operation
pingora copied to clipboard

Issue with forwarding HTTP body after reading in proxy request filter method

Open netrice opened this issue 1 year ago • 7 comments
trafficstars

I want to proxy an HTTP API and add simple authentication for client. The authentication requires computing an MD5 hash of the POST request's body and URL parameters, then placing this hash in the auth field of the request header. In the request_filter method of the ProxyHttp, I plan to verify the MD5 hash, and only if it matches will the request be forwarded to the upstream server.

However, I found that after using the session.read_request_body method to obtain the body content in the request_filter method, the code stops forwarding the body after outputting the log "pingora_proxy::proxy_h1: Sending header to upstream RequestHeader xxx". How can I get a copy of the body in the request_filter method without affecting the forwarding to the upstream server?

netrice avatar Aug 10 '24 03:08 netrice

I think the requestBody needs to be read in async fn request_body_filter( method. https://github.com/cloudflare/pingora/blob/main/pingora-proxy/src/proxy_trait.rs#L94

github2023spring avatar Aug 11 '24 03:08 github2023spring

I think the requestBody needs to be read in async fn request_body_filter( method. https://github.com/cloudflare/pingora/blob/main/pingora-proxy/src/proxy_trait.rs#L94

If the authentication is handled in request_body_filter, it won't be possible to block the request to the downstream.

netrice avatar Aug 12 '24 03:08 netrice

I think the requestBody needs to be read in async fn request_body_filter( method. https://github.com/cloudflare/pingora/blob/main/pingora-proxy/src/proxy_trait.rs#L94

If the validation logic is handled in request_body_filter, the client's request headers will be forwarded to the upstream first, and then the connection will be aborted. This doesn't align with the logic of authenticating first.

netrice avatar Aug 12 '24 04:08 netrice

Hmm, I tried to buffer the request into the CTX, and then set back in the request_body_filter, but it does not work.

github2023spring avatar Aug 13 '24 05:08 github2023spring

Do you have an example for us to look at? I'm guessing this not working has to do with read_request_body consuming the body.

drcaramelsyrup avatar Aug 16 '24 03:08 drcaramelsyrup

fn check_login(req: &pingora_http::RequestHeader) -> bool {
    // implement you logic check logic here
    req.headers.get("Authorization").map(|v| v.as_bytes()) == Some(b"password")
}

#[derive(Deserialize, Debug)]
struct RequestBody {
    argument: Option<String>,
}


#[derive(Debug)]
pub struct MyGate;

pub struct RequestCtx {
    meter: Meter,
    timers: HashMap<&'static str, SystemTime>,
    body: Option<Bytes>,
}

#[async_trait]
impl ProxyHttp for MyGate {
    type CTX = RequestCtx;
    fn new_ctx(&self) -> Self::CTX {
        RequestCtx {
            meter: global::meter("test_rust_application"),
            timers: HashMap::new(),
            body: None,
        }
    }

    #[tracing::instrument(
        skip(session,ctx),
        fields(
            req_uri=%session.req_header().uri.path_and_query().unwrap()
        ))]
    async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
        ctx.timers.insert("request_time_ms", SystemTime::now());
        let body = session.read_request_body().await.unwrap();
        session.enable_retry_buffering();
        if session.req_header().uri.path() == "/graphql" {
            error!("before in the header");
            if let Some(x) = body {
                match session
                    .req_header()
                    .headers
                    .get("content-type")
                    .map(|v| v.to_str())
                {
                    Some(Ok("application/json")) => {
                        error!("before in the json header");
                        let body_json: RequestBody = serde_json::from_slice(&x).unwrap();
                        if let Some(b) = body_json.argument {
                            let body_bytes = Bytes::from(b);
                            ctx.body.replace(body_bytes.clone());
                        }
                    }
                    _ => {}
                }
            }
        }
        if session.req_header().uri.path().starts_with("/login")
            && !check_login(session.req_header())
        {
            let resp = error_resp::gen_error_response(403).clone();

            session
                .write_response_header(Box::new(resp), false)
                .await
                .unwrap_or_else(|e| {
                    error!("failed to send error response to downstream: {e}");
                });
            // true: early return as the response is already written
            // self.rejected_metric.inc();
            return Ok(true);
        }
        Ok(false)
    }

    async fn request_body_filter(
        &self,
        _session: &mut Session,
        body: &mut Option<Bytes>,
        _end_of_stream: bool,
        ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        error!("before in the body");
        if let Some(x) = ctx.body.clone() {
            error!("test in the body");
            body.replace(x);
        }
        Ok(())
    }

    async fn upstream_request_filter(
        &self,
        session: &mut Session,
        upstream_request: &mut RequestHeader,
        ctx: &mut Self::CTX,
    ) -> Result<()> {
        upstream_request.remove_header("content-length");
        upstream_request.insert_header("content-length", "262");
        upstream_request.remove_header("up-stream");
        upstream_request.remove_header("up-stream-scheme");
        upstream_request.remove_header("up-stream-port");
        let path = session.req_header().uri.path();
        if path.starts_with("/view") {
            let upstream_path = if path == "/view" {
                path.strip_prefix("/view").unwrap_or("").to_string()
            } else {
                String::from("/") + path.strip_prefix("/view").unwrap_or("")
            };
            upstream_request.set_uri(
                Uri::builder()
                    .authority(
                        upstream_request
                            .uri
                            .authority()
                            .unwrap_or(&Authority::from_static("tokio.rs"))
                            .clone(),
                    )
                    .scheme(session.req_header().uri.scheme_str().unwrap_or("http"))
                    .path_and_query(
                        upstream_path + (session.req_header().uri.query().unwrap_or("")),
                    )
                    .build()
                    .unwrap(),
            );
        }
        error!("{:?}", ctx.body);
        Ok(())
    }

    async fn upstream_peer(
        &self,
        session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        let mut router_addr = match session.req_header().headers.get("up-stream") {
            None => "1.0.0.1".to_string(),
            Some(header_value) => header_value.to_str().unwrap_or_default().to_string(),
        };

        if session.req_header().uri.path() == "/view" {
            router_addr = "localhost".to_string();
        }

        let up_stream_scheme = match session.req_header().headers.get("up-stream-scheme") {
            None => false,
            Some(header_value) => {
                "Https".eq_ignore_ascii_case(header_value.to_str().unwrap_or_default())
            }
        };

        let mut router_port = match session.req_header().headers.get("up-stream-port") {
            None => {
                if up_stream_scheme {
                    443
                } else {
                    80
                }
            }
            Some(header_value) => header_value.to_str().unwrap().parse().unwrap(),
        };

        if session.req_header().uri.path() == "/view" {
            router_port = 3000;
        }

        debug!("connecting to {router_addr} {router_port}");

        let peer = Box::new(HttpPeer::new(
            (router_addr, router_port),
            up_stream_scheme,
            "one.one.one.one".to_string(),
        ));
        Ok(peer)
    }

    async fn response_filter(
        &self,
        _session: &mut Session,
        upstream_response: &mut ResponseHeader,
        _ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {
        // replace existing header if any
        upstream_response
            .insert_header("Server", "PingoraGateway")
            .unwrap();
        // because we don't support h3
        upstream_response.remove_header("alt-svc");
        Ok(())
    }

    async fn logging(
        &self,
        session: &mut Session,
        _e: Option<&pingora_core::Error>,
        ctx: &mut Self::CTX,
    ) {
        let tracing = global::tracer("request_summary");
        let request_time_ms = ctx
            .timers
            .remove("request_time_ms")
            .unwrap()
            .elapsed()
            .unwrap()
            .as_millis();
        tracing.in_span("request_summary", |cx| {
            tracing::info!(
                monotonic_counter.request_summary = 1_u64,
                key_1 = "request_time",
                key_2 = 10,
                "{} response time: {request_time_ms}",
                self.request_summary(session, ctx)
            );
        });
    }
}

the request will not enter request_body_filter.

github2023spring avatar Aug 16 '24 15:08 github2023spring

I have a simpler like this,

execute curl -X POST -H "auth: 37dec66febcdb7e0e80a32565d7a0ac6662d079f4db21a4609e3631176cbd680" -H "uid: uid1" "http://127.0.0.1:8080/" -d "abcd" -v, the upstream can receive the request headers but not the body. And the curl downstream is stuck.

use async_trait::async_trait;
use bytes::Bytes;
use pingora::{
    prelude::HttpPeer, proxy::{http_proxy_service, ProxyHttp, Session}, server::Server, Error, ErrorType, Result
};
use sha2::{Digest, Sha256};
use tracing::info;
use tracing_subscriber::{fmt, EnvFilter};
use tracing_subscriber::prelude::*;

struct ApiProxy;

#[async_trait]
impl ProxyHttp for ApiProxy {
    type CTX = ();

    fn new_ctx(&self) -> Self::CTX {}

    async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
        let req_header = session.req_header();
        
        if let Some(auth) = req_header.headers.get("auth") {
            let auth = auth.to_str().unwrap().to_owned();
            let uid = req_header
                .headers
                .get("uid")
                .map_or("", |v| v.to_str().unwrap())
                .to_string();
            let body = session.read_request_body().await?;    //----This method will consume the body----
            let sum_payload = match body {
                Some(body) => {
                    let mut hash = Sha256::new();
                    hash.update(uid.as_bytes());
                    hash.update("USER_SECRET".as_bytes());
                    hash.update(body.slice(..).as_ref());
                    &format!("{:X}", hash.finalize())
                }
                None => "",
            };
            if sum_payload.to_lowercase() == auth {
                //return Ok(true);  // ----Oops, I missed that the request_filter needs to return Ok(false) for it to keep forwarding.----
                return Ok(false);
            } else {
                return Err(pingora::Error::new(ErrorType::Custom("auth err")));
            }
        }
        Err(pingora::Error::new(ErrorType::Custom("auth err")))
    }

    async fn upstream_peer(
        &self,
        _session: &mut Session,
        _ctx: &mut Self::CTX,
    ) -> Result<Box<HttpPeer>> {
        let peer = HttpPeer::new(("127.0.0.1", 80), false, "".to_owned());

        Ok(Box::new(peer))
    }

    async fn request_body_filter(
        &self,
        _session: &mut Session,
        body: &mut Option<Bytes>,
        _end_of_stream: bool,
        _ctx: &mut Self::CTX,
    ) -> Result<()>
    where
        Self::CTX: Send + Sync,
    {

        info!("working here, {:?}", body);     //---  Can't get the body again ---
        Ok(())
    }
}

fn main() {
    let fmt_layer = fmt::layer().with_target(false);
    let filter_layer = tracing_subscriber::EnvFilter::try_from_default_env()
        .or_else(|_| EnvFilter::try_new("debug"))
        .unwrap();

    tracing_subscriber::registry()
        .with(filter_layer)
        .with(fmt_layer)
        .init();

    let mut srv = Server::new(None).unwrap();
    srv.bootstrap();
    let mut pxy = http_proxy_service(&srv.configuration, ApiProxy {});
    pxy.add_tcp("127.0.0.1:8080");
    srv.add_service(pxy);

    srv.run_forever();
}

netrice avatar Aug 17 '24 09:08 netrice

I see. Fundamentally it seems like you need to tell pingora-proxy to run the request body filter, even if the downstream session's body is no longer present from pingora-proxy's perspective. This does seem hard to do with what we have today, and is worth its own feature.

drcaramelsyrup avatar Aug 20 '24 19:08 drcaramelsyrup

I face the same problem. Btw, it's worth noting that read_request_body doesn't read the entire body but instead only a chunk of it and the docs "Read the request body. Ok(None) if no (more) body to read" don't clarify this

async fn upstream_request_filter(
    &self,
    session: &mut Session,
    upstream_request: &mut RequestHeader,
    ctx: &mut Self::CTX,
) -> Result<()>
where
    Self::CTX: Send + Sync,
{
    while let Some(x) = session.read_request_body().await? {
        tracing::error!("got body {}", x.len());
        ctx.body_buf.put_slice(&x);
    }
    Ok(())
}

gives:

2024-08-30T07:45:39.025242Z ERROR g_hole::gateway: got body 2690
2024-08-30T07:45:39.025286Z ERROR g_hole::gateway: got body 61440
2024-08-30T07:45:39.025324Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025365Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025407Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025467Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025492Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025516Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025538Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025640Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025678Z ERROR g_hole::gateway: got body 27092

deliro avatar Aug 30 '24 08:08 deliro

@drcaramelsyrup: is there any workaround now to achieve reading body before sending headers to the upstream?

Now, reading body in upstream_request_filter results in:

2024-08-30T07:45:39.026536Z ERROR pingora_proxy: Fail to proxy: Upstream PrematureBodyEnd context: Peer: addr: 127.0.0.1:8080, scheme: HTTP,sni: 127.0.0.1, cause: context: Content-length: 615510 bytes written: 0, status: 502, tries: 1, retry: false, POST /test/200, Host: localhost:6191

even before it reaches request_body_filter

deliro avatar Aug 30 '24 09:08 deliro

I was facing the same situation, but found that the following code can successfully process the request while getting the request body.

If the problem has been resolved by an update of pingora, please disregard it.

// ver. 0.3.0
async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool>
    where
        Self::CTX: Send + Sync
    {
       // any process...
       
        session.enable_retry_buffering();
        session.read_body_or_idle(false).await.unwrap().unwrap();
        let request_body = session.get_retry_buffer();
        
        // any process...
     }

maclo2012 avatar Sep 26 '24 15:09 maclo2012

@maclo2012 Yeah, this looks like a solution, but the crucial thing is to not keep the whole body in RAM at once, but instead to be able to write it to a temporary file (say, if the requests are quite large)

deliro avatar Sep 26 '24 16:09 deliro

I was facing the same situation, but found that the following code can successfully process the request while getting the request body.

If the problem has been resolved by an update of pingora, please disregard it.

// ver. 0.3.0
async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool>
    where
        Self::CTX: Send + Sync
    {
       // any process...
       
        session.enable_retry_buffering();
        session.read_body_or_idle(false).await.unwrap().unwrap();
        let request_body = session.get_retry_buffer();
        
        // any process...
     }

That actually fixes my issue too.

netrice avatar Oct 10 '24 03:10 netrice

@maclo2012 If the request body content is small, there's no problem, but what should be done if the request body content is large? This seems to only read a small portion of the request body. It needs to read in a loop, but after looping, it cannot call session.get_retry_buffer to return the content.

hotlif avatar Nov 21 '24 03:11 hotlif

@maclo2012 , @netrice This solution does only work for a body size up to 64 KiBi. Otherwise the retry buffer will be truncated.

ghost avatar Nov 29 '24 13:11 ghost

This does seem to be an issue, but would it be feasible to increase the retry buffer to a sufficiently large value? In my case, the body is relatively small than 64K.

netrice avatar Dec 06 '24 04:12 netrice

We added a feature flag to use a spoolfile as a retry buffer here: https://github.com/lindenbaum/pingora/tree/feature/retry_buffer_tempfile

We didn't open a PR, because

  • Its probably a niche use case
  • We are handling AI requests to LLMs, which will take long anyway, so we didn't mind totally destroying pingoras performance by waiting for the writes to disk to happen before the proxy event loop continues and waiting for the disk load to happen before entering the proxy event loop. One could mitigate this by using an actor and a task queue for the writes and altering the event loop not to send the whole retry buffer at the top, but reading it piece by piece and sending the individual pieces before reading the next one, giving upstream the possibility to send a piece of the response before we event sent the whole retry_buffer (a bit like it is done with caching, but way less complex).

So if you want to try it out, keep that in mind.

ghost avatar Dec 06 '24 06:12 ghost