pingora icon indicating copy to clipboard operation
pingora copied to clipboard

Is there any solution to process the request_body before upstream_peer?

Open sxhxliang opened this issue 9 months ago • 3 comments

What is the problem your feature solves, or the need it fulfills?

pingora current process is: start("new request")-->early_request_filter; early_request_filter-->request_filter; request_filter-->upstream_peer;

 graph TD;
    start("new request")-->early_request_filter;
    early_request_filter-->request_filter;
    request_filter-->upstream_peer;

    upstream_peer-->Connect{{IO: connect to upstream}};

    Connect--connection success-->connected_to_upstream;
    Connect--connection failure-->fail_to_connect;

    connected_to_upstream-->upstream_request_filter;
    upstream_request_filter --> request_body_filter;
    request_body_filter --> SendReq{{IO: send request to upstream}};
    SendReq-->RecvResp{{IO: read response from upstream}};
    RecvResp-->upstream_response_filter-->response_filter-->upstream_response_body_filter-->response_body_filter-->logging-->endreq("request done");

    fail_to_connect --can retry-->upstream_peer;
    fail_to_connect --can't retry-->fail_to_proxy--send error response-->logging;

    RecvResp--failure-->IOFailure;
    SendReq--failure-->IOFailure;
    error_while_proxy--can retry-->upstream_peer;
    error_while_proxy--can't retry-->fail_to_proxy;

    request_filter --send response-->logging


    Error>any response filter error]-->error_while_proxy
    IOFailure>IO error]-->error_while_proxy

I want to process the request_body before upstream_peer. Is this possible now?

 graph TD;
      request_filter-->request_body_filter-->upstream_peer

I tried calling read_request_body before upstream_peer:

let body = session.downstream_session.read_request_body().await?;

This works, but the subsequent pipeline cannot access the request_body, causing the process to fail.

So, is there any solution to process the request_body before upstream_peer?

Describe the solution you'd like

What do you propose to resolve the problem or fulfill the need above? How would you like it to work?

Describe alternatives you've considered

What other solutions, features, or workarounds have you considered that might also solve the issue? What are the tradeoffs for these alternatives compared to what you're proposing?

Additional context

This could include references to documentation or papers, prior art, screenshots, or benchmark results.

sxhxliang avatar Mar 29 '25 14:03 sxhxliang

You can only read the body once, so if you read it you have to send a response. You can do that in eg in request_filter() and return Ok(true), which signals that a response was already sent.

theduke avatar Apr 01 '25 18:04 theduke

You can only read the body once, so if you read it you have to send a response. You can do that in eg in request_filter() and return Ok(true), which signals that a response was already sent.

How can I select upstream through the request body field?

sxhxliang avatar Apr 03 '25 17:04 sxhxliang

maybe you can try this in request_filter

session.enable_retry_buffering();
let request_body = session.read_request_body().await;

enable buffer would cache the request body, so that the request_body_filter will work fine

meredith233 avatar Apr 17 '25 01:04 meredith233

maybe you can try this in request_filter

session.enable_retry_buffering(); let request_body = session.read_request_body().await; enable buffer would cache the request body, so that the request_body_filter will work fine

work fine!!! already used it in my project mcp-access-point

The previous idea was to add a cache attribute and extract it from the cache if it is already cached.

https://github.com/sxhxliang/pingora/commit/f73ccef04f6987ad1847b0665a0a1bc8c26bea14#diff-96d7e471b7f998a637f220e40e52b4166afc41d2a34a0e81c8be3b94bf5881d6

 pub async fn read_body_bytes(&mut self) -> Result<Option<Bytes>> {

        if self.request_body.is_some() && self.is_body_done() {
            return Ok(self.request_body.clone());
        }
        let read = self.read_body().await?;
        Ok(read.map(|b| {
            let bytes = Bytes::copy_from_slice(self.get_body(&b));
            self.body_bytes_read += bytes.len();
            if let Some(buffer) = self.retry_buffer.as_mut() {
                buffer.write_to_buffer(&bytes);
            }
            self.request_body = Some(bytes.clone());
            bytes
        }))
    }

sxhxliang avatar May 20 '25 10:05 sxhxliang