Is there any solution to process the request_body before upstream_peer?
What is the problem your feature solves, or the need it fulfills?
pingora current process is: start("new request")-->early_request_filter; early_request_filter-->request_filter; request_filter-->upstream_peer;
graph TD;
start("new request")-->early_request_filter;
early_request_filter-->request_filter;
request_filter-->upstream_peer;
upstream_peer-->Connect{{IO: connect to upstream}};
Connect--connection success-->connected_to_upstream;
Connect--connection failure-->fail_to_connect;
connected_to_upstream-->upstream_request_filter;
upstream_request_filter --> request_body_filter;
request_body_filter --> SendReq{{IO: send request to upstream}};
SendReq-->RecvResp{{IO: read response from upstream}};
RecvResp-->upstream_response_filter-->response_filter-->upstream_response_body_filter-->response_body_filter-->logging-->endreq("request done");
fail_to_connect --can retry-->upstream_peer;
fail_to_connect --can't retry-->fail_to_proxy--send error response-->logging;
RecvResp--failure-->IOFailure;
SendReq--failure-->IOFailure;
error_while_proxy--can retry-->upstream_peer;
error_while_proxy--can't retry-->fail_to_proxy;
request_filter --send response-->logging
Error>any response filter error]-->error_while_proxy
IOFailure>IO error]-->error_while_proxy
I want to process the request_body before upstream_peer. Is this possible now?
graph TD;
request_filter-->request_body_filter-->upstream_peer
I tried calling read_request_body before upstream_peer:
let body = session.downstream_session.read_request_body().await?;
This works, but the subsequent pipeline cannot access the request_body, causing the process to fail.
So, is there any solution to process the request_body before upstream_peer?
Describe the solution you'd like
What do you propose to resolve the problem or fulfill the need above? How would you like it to work?
Describe alternatives you've considered
What other solutions, features, or workarounds have you considered that might also solve the issue? What are the tradeoffs for these alternatives compared to what you're proposing?
Additional context
This could include references to documentation or papers, prior art, screenshots, or benchmark results.
You can only read the body once, so if you read it you have to send a response.
You can do that in eg in request_filter() and return Ok(true), which signals that a response was already sent.
You can only read the body once, so if you read it you have to send a response. You can do that in eg in
request_filter()and returnOk(true), which signals that a response was already sent.
How can I select upstream through the request body field?
maybe you can try this in request_filter
session.enable_retry_buffering();
let request_body = session.read_request_body().await;
enable buffer would cache the request body, so that the request_body_filter will work fine
maybe you can try this in
request_filtersession.enable_retry_buffering(); let request_body = session.read_request_body().await; enable buffer would cache the request body, so that the request_body_filter will work fine
work fine!!! already used it in my project mcp-access-point
The previous idea was to add a cache attribute and extract it from the cache if it is already cached.
https://github.com/sxhxliang/pingora/commit/f73ccef04f6987ad1847b0665a0a1bc8c26bea14#diff-96d7e471b7f998a637f220e40e52b4166afc41d2a34a0e81c8be3b94bf5881d6
pub async fn read_body_bytes(&mut self) -> Result<Option<Bytes>> {
if self.request_body.is_some() && self.is_body_done() {
return Ok(self.request_body.clone());
}
let read = self.read_body().await?;
Ok(read.map(|b| {
let bytes = Bytes::copy_from_slice(self.get_body(&b));
self.body_bytes_read += bytes.len();
if let Some(buffer) = self.retry_buffer.as_mut() {
buffer.write_to_buffer(&bytes);
}
self.request_body = Some(bytes.clone());
bytes
}))
}