pingora
pingora copied to clipboard
Issue with forwarding HTTP body after reading in proxy request filter method
I want to proxy an HTTP API and add simple authentication for client. The authentication requires computing an MD5 hash of the POST request's body and URL parameters, then placing this hash in the auth field of the request header. In the request_filter method of the ProxyHttp, I plan to verify the MD5 hash, and only if it matches will the request be forwarded to the upstream server.
However, I found that after using the session.read_request_body method to obtain the body content in the request_filter method, the code stops forwarding the body after outputting the log "pingora_proxy::proxy_h1: Sending header to upstream RequestHeader xxx". How can I get a copy of the body in the request_filter method without affecting the forwarding to the upstream server?
I think the requestBody needs to be read in async fn request_body_filter( method. https://github.com/cloudflare/pingora/blob/main/pingora-proxy/src/proxy_trait.rs#L94
I think the
requestBodyneeds to be read inasync fn request_body_filter(method. https://github.com/cloudflare/pingora/blob/main/pingora-proxy/src/proxy_trait.rs#L94
If the authentication is handled in request_body_filter, it won't be possible to block the request to the downstream.
I think the
requestBodyneeds to be read inasync fn request_body_filter(method. https://github.com/cloudflare/pingora/blob/main/pingora-proxy/src/proxy_trait.rs#L94
If the validation logic is handled in request_body_filter, the client's request headers will be forwarded to the upstream first, and then the connection will be aborted. This doesn't align with the logic of authenticating first.
Hmm, I tried to buffer the request into the CTX, and then set back in the request_body_filter, but it does not work.
Do you have an example for us to look at? I'm guessing this not working has to do with read_request_body consuming the body.
fn check_login(req: &pingora_http::RequestHeader) -> bool {
// implement you logic check logic here
req.headers.get("Authorization").map(|v| v.as_bytes()) == Some(b"password")
}
#[derive(Deserialize, Debug)]
struct RequestBody {
argument: Option<String>,
}
#[derive(Debug)]
pub struct MyGate;
pub struct RequestCtx {
meter: Meter,
timers: HashMap<&'static str, SystemTime>,
body: Option<Bytes>,
}
#[async_trait]
impl ProxyHttp for MyGate {
type CTX = RequestCtx;
fn new_ctx(&self) -> Self::CTX {
RequestCtx {
meter: global::meter("test_rust_application"),
timers: HashMap::new(),
body: None,
}
}
#[tracing::instrument(
skip(session,ctx),
fields(
req_uri=%session.req_header().uri.path_and_query().unwrap()
))]
async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> {
ctx.timers.insert("request_time_ms", SystemTime::now());
let body = session.read_request_body().await.unwrap();
session.enable_retry_buffering();
if session.req_header().uri.path() == "/graphql" {
error!("before in the header");
if let Some(x) = body {
match session
.req_header()
.headers
.get("content-type")
.map(|v| v.to_str())
{
Some(Ok("application/json")) => {
error!("before in the json header");
let body_json: RequestBody = serde_json::from_slice(&x).unwrap();
if let Some(b) = body_json.argument {
let body_bytes = Bytes::from(b);
ctx.body.replace(body_bytes.clone());
}
}
_ => {}
}
}
}
if session.req_header().uri.path().starts_with("/login")
&& !check_login(session.req_header())
{
let resp = error_resp::gen_error_response(403).clone();
session
.write_response_header(Box::new(resp), false)
.await
.unwrap_or_else(|e| {
error!("failed to send error response to downstream: {e}");
});
// true: early return as the response is already written
// self.rejected_metric.inc();
return Ok(true);
}
Ok(false)
}
async fn request_body_filter(
&self,
_session: &mut Session,
body: &mut Option<Bytes>,
_end_of_stream: bool,
ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
error!("before in the body");
if let Some(x) = ctx.body.clone() {
error!("test in the body");
body.replace(x);
}
Ok(())
}
async fn upstream_request_filter(
&self,
session: &mut Session,
upstream_request: &mut RequestHeader,
ctx: &mut Self::CTX,
) -> Result<()> {
upstream_request.remove_header("content-length");
upstream_request.insert_header("content-length", "262");
upstream_request.remove_header("up-stream");
upstream_request.remove_header("up-stream-scheme");
upstream_request.remove_header("up-stream-port");
let path = session.req_header().uri.path();
if path.starts_with("/view") {
let upstream_path = if path == "/view" {
path.strip_prefix("/view").unwrap_or("").to_string()
} else {
String::from("/") + path.strip_prefix("/view").unwrap_or("")
};
upstream_request.set_uri(
Uri::builder()
.authority(
upstream_request
.uri
.authority()
.unwrap_or(&Authority::from_static("tokio.rs"))
.clone(),
)
.scheme(session.req_header().uri.scheme_str().unwrap_or("http"))
.path_and_query(
upstream_path + (session.req_header().uri.query().unwrap_or("")),
)
.build()
.unwrap(),
);
}
error!("{:?}", ctx.body);
Ok(())
}
async fn upstream_peer(
&self,
session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let mut router_addr = match session.req_header().headers.get("up-stream") {
None => "1.0.0.1".to_string(),
Some(header_value) => header_value.to_str().unwrap_or_default().to_string(),
};
if session.req_header().uri.path() == "/view" {
router_addr = "localhost".to_string();
}
let up_stream_scheme = match session.req_header().headers.get("up-stream-scheme") {
None => false,
Some(header_value) => {
"Https".eq_ignore_ascii_case(header_value.to_str().unwrap_or_default())
}
};
let mut router_port = match session.req_header().headers.get("up-stream-port") {
None => {
if up_stream_scheme {
443
} else {
80
}
}
Some(header_value) => header_value.to_str().unwrap().parse().unwrap(),
};
if session.req_header().uri.path() == "/view" {
router_port = 3000;
}
debug!("connecting to {router_addr} {router_port}");
let peer = Box::new(HttpPeer::new(
(router_addr, router_port),
up_stream_scheme,
"one.one.one.one".to_string(),
));
Ok(peer)
}
async fn response_filter(
&self,
_session: &mut Session,
upstream_response: &mut ResponseHeader,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
// replace existing header if any
upstream_response
.insert_header("Server", "PingoraGateway")
.unwrap();
// because we don't support h3
upstream_response.remove_header("alt-svc");
Ok(())
}
async fn logging(
&self,
session: &mut Session,
_e: Option<&pingora_core::Error>,
ctx: &mut Self::CTX,
) {
let tracing = global::tracer("request_summary");
let request_time_ms = ctx
.timers
.remove("request_time_ms")
.unwrap()
.elapsed()
.unwrap()
.as_millis();
tracing.in_span("request_summary", |cx| {
tracing::info!(
monotonic_counter.request_summary = 1_u64,
key_1 = "request_time",
key_2 = 10,
"{} response time: {request_time_ms}",
self.request_summary(session, ctx)
);
});
}
}
the request will not enter request_body_filter.
I have a simpler like this,
execute curl -X POST -H "auth: 37dec66febcdb7e0e80a32565d7a0ac6662d079f4db21a4609e3631176cbd680" -H "uid: uid1" "http://127.0.0.1:8080/" -d "abcd" -v, the upstream can receive the request headers but not the body. And the curl downstream is stuck.
use async_trait::async_trait;
use bytes::Bytes;
use pingora::{
prelude::HttpPeer, proxy::{http_proxy_service, ProxyHttp, Session}, server::Server, Error, ErrorType, Result
};
use sha2::{Digest, Sha256};
use tracing::info;
use tracing_subscriber::{fmt, EnvFilter};
use tracing_subscriber::prelude::*;
struct ApiProxy;
#[async_trait]
impl ProxyHttp for ApiProxy {
type CTX = ();
fn new_ctx(&self) -> Self::CTX {}
async fn request_filter(&self, session: &mut Session, _ctx: &mut Self::CTX) -> Result<bool> {
let req_header = session.req_header();
if let Some(auth) = req_header.headers.get("auth") {
let auth = auth.to_str().unwrap().to_owned();
let uid = req_header
.headers
.get("uid")
.map_or("", |v| v.to_str().unwrap())
.to_string();
let body = session.read_request_body().await?; //----This method will consume the body----
let sum_payload = match body {
Some(body) => {
let mut hash = Sha256::new();
hash.update(uid.as_bytes());
hash.update("USER_SECRET".as_bytes());
hash.update(body.slice(..).as_ref());
&format!("{:X}", hash.finalize())
}
None => "",
};
if sum_payload.to_lowercase() == auth {
//return Ok(true); // ----Oops, I missed that the request_filter needs to return Ok(false) for it to keep forwarding.----
return Ok(false);
} else {
return Err(pingora::Error::new(ErrorType::Custom("auth err")));
}
}
Err(pingora::Error::new(ErrorType::Custom("auth err")))
}
async fn upstream_peer(
&self,
_session: &mut Session,
_ctx: &mut Self::CTX,
) -> Result<Box<HttpPeer>> {
let peer = HttpPeer::new(("127.0.0.1", 80), false, "".to_owned());
Ok(Box::new(peer))
}
async fn request_body_filter(
&self,
_session: &mut Session,
body: &mut Option<Bytes>,
_end_of_stream: bool,
_ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
info!("working here, {:?}", body); //--- Can't get the body again ---
Ok(())
}
}
fn main() {
let fmt_layer = fmt::layer().with_target(false);
let filter_layer = tracing_subscriber::EnvFilter::try_from_default_env()
.or_else(|_| EnvFilter::try_new("debug"))
.unwrap();
tracing_subscriber::registry()
.with(filter_layer)
.with(fmt_layer)
.init();
let mut srv = Server::new(None).unwrap();
srv.bootstrap();
let mut pxy = http_proxy_service(&srv.configuration, ApiProxy {});
pxy.add_tcp("127.0.0.1:8080");
srv.add_service(pxy);
srv.run_forever();
}
I see. Fundamentally it seems like you need to tell pingora-proxy to run the request body filter, even if the downstream session's body is no longer present from pingora-proxy's perspective. This does seem hard to do with what we have today, and is worth its own feature.
I face the same problem. Btw, it's worth noting that read_request_body doesn't read the entire body but instead only a chunk of it and the docs "Read the request body. Ok(None) if no (more) body to read" don't clarify this
async fn upstream_request_filter(
&self,
session: &mut Session,
upstream_request: &mut RequestHeader,
ctx: &mut Self::CTX,
) -> Result<()>
where
Self::CTX: Send + Sync,
{
while let Some(x) = session.read_request_body().await? {
tracing::error!("got body {}", x.len());
ctx.body_buf.put_slice(&x);
}
Ok(())
}
gives:
2024-08-30T07:45:39.025242Z ERROR g_hole::gateway: got body 2690
2024-08-30T07:45:39.025286Z ERROR g_hole::gateway: got body 61440
2024-08-30T07:45:39.025324Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025365Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025407Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025467Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025492Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025516Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025538Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025640Z ERROR g_hole::gateway: got body 65536
2024-08-30T07:45:39.025678Z ERROR g_hole::gateway: got body 27092
@drcaramelsyrup: is there any workaround now to achieve reading body before sending headers to the upstream?
Now, reading body in upstream_request_filter results in:
2024-08-30T07:45:39.026536Z ERROR pingora_proxy: Fail to proxy: Upstream PrematureBodyEnd context: Peer: addr: 127.0.0.1:8080, scheme: HTTP,sni: 127.0.0.1, cause: context: Content-length: 615510 bytes written: 0, status: 502, tries: 1, retry: false, POST /test/200, Host: localhost:6191
even before it reaches request_body_filter
I was facing the same situation, but found that the following code can successfully process the request while getting the request body.
If the problem has been resolved by an update of pingora, please disregard it.
// ver. 0.3.0
async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool>
where
Self::CTX: Send + Sync
{
// any process...
session.enable_retry_buffering();
session.read_body_or_idle(false).await.unwrap().unwrap();
let request_body = session.get_retry_buffer();
// any process...
}
@maclo2012 Yeah, this looks like a solution, but the crucial thing is to not keep the whole body in RAM at once, but instead to be able to write it to a temporary file (say, if the requests are quite large)
I was facing the same situation, but found that the following code can successfully process the request while getting the request body.
If the problem has been resolved by an update of pingora, please disregard it.
// ver. 0.3.0 async fn request_filter(&self, session: &mut Session, ctx: &mut Self::CTX) -> Result<bool> where Self::CTX: Send + Sync { // any process... session.enable_retry_buffering(); session.read_body_or_idle(false).await.unwrap().unwrap(); let request_body = session.get_retry_buffer(); // any process... }
That actually fixes my issue too.
@maclo2012 If the request body content is small, there's no problem, but what should be done if the request body content is large? This seems to only read a small portion of the request body. It needs to read in a loop, but after looping, it cannot call session.get_retry_buffer to return the content.
@maclo2012 , @netrice This solution does only work for a body size up to 64 KiBi. Otherwise the retry buffer will be truncated.
This does seem to be an issue, but would it be feasible to increase the retry buffer to a sufficiently large value? In my case, the body is relatively small than 64K.
We added a feature flag to use a spoolfile as a retry buffer here: https://github.com/lindenbaum/pingora/tree/feature/retry_buffer_tempfile
We didn't open a PR, because
- Its probably a niche use case
- We are handling AI requests to LLMs, which will take long anyway, so we didn't mind totally destroying pingoras performance by waiting for the writes to disk to happen before the proxy event loop continues and waiting for the disk load to happen before entering the proxy event loop. One could mitigate this by using an actor and a task queue for the writes and altering the event loop not to send the whole retry buffer at the top, but reading it piece by piece and sending the individual pieces before reading the next one, giving upstream the possibility to send a piece of the response before we event sent the whole retry_buffer (a bit like it is done with caching, but way less complex).
So if you want to try it out, keep that in mind.