aiohttp icon indicating copy to clipboard operation
aiohttp copied to clipboard

Add support for ZSTD compression

Open KGuillaume-chaps opened this issue 5 months ago • 3 comments

What do these changes do?

This patch adds support for ZSTD compression, as supported by the Firefox browser and the Caddy web server

Are there changes in behavior for the user?

If the user doesn't install the zstandard library, nothing will change.

Is it a substantial burden for the maintainers to support this?

The impact on existing code is low to none, and there are unit tests to ensure nothing breaks.

Related issue number

No related issue

Checklist

  • [x] I think the code is well written
  • [x] Unit tests for the changes exist
  • [x] Documentation reflects the changes
  • [x] If you provide code modification, please add yourself to CONTRIBUTORS.txt
    • The format is <Name> <Surname>.
    • Please keep alphabetical order, the file is sorted by names.
  • [x] Add a new news fragment into the CHANGES/ folder
    • name it <issue_or_pr_num>.<type>.rst (e.g. 588.bugfix.rst)

    • if you don't have an issue number, change it to the pull request number after creating the PR

      • .bugfix: A bug fix for something the maintainers deemed an improper undesired behavior that got corrected to match pre-agreed expectations.
      • .feature: A new behavior, public APIs. That sort of stuff.
      • .deprecation: A declaration of future API removals and breaking changes in behavior.
      • .breaking: When something public is removed in a breaking way. Could be deprecated in an earlier release.
      • .doc: Notable updates to the documentation structure or build process.
      • .packaging: Notes for downstreams about unobvious side effects and tooling. Changes in the test invocation considerations and runtime assumptions.
      • .contrib: Stuff that affects the contributor experience. e.g. Running tests, building the docs, setting up the development environment.
      • .misc: Changes that are hard to assign to any of the above categories.
    • Make sure to use full sentences with correct case and punctuation, for example:

      Fixed issue with non-ascii contents in doctest text files
      -- by :user:`contributor-gh-handle`.
      

      Use the past tense or the present tense a non-imperative mood, referring to what's changed compared to the last released version of this project.

KGuillaume-chaps avatar Jun 09 '25 07:06 KGuillaume-chaps

This doesn't seem like a proper fix. The client might have closed its writer side of the connection with a FIN, but still want to receive the response.

That's was why that state_is_none check was there. If the client really closed the whole connection, then you would be able to detect it when trying to write something to it, can you provide more information regarding your blocked io::copy?

thalesfragoso avatar Aug 26 '25 21:08 thalesfragoso

Our program flow is divided into three parts:

  1. The browser initiates a websocket connection and closes the connection upon exit.
  2. Service A acts as a proxy for the websocket connection. actix-web receives web::Payload, upgrades it to a websocket connection using reqwest::Client, and forwards it to Service B. The request is relayed using tokio::io::copy(payload_upgrade_reader, client_upgrade_writer).
  3. Service B uses actix_ws::Session to receive websocket requests. Upon receiving a Message::Close or detecting a network interruption, it closes the connection using session.close().

service A (pseudocode)

async fn proxy(payload: web::Payload, client: web::Data<reqwest::Client>, target_url: &str) -> Result<HttpResponse, actix_web::Error> {
	let mut builder = client.get(target_url);
	let target_response = builder.send.await.map_err(...)?;
	let target_upgrade = target_response.upgrade().await.map_err(...)?;
	let (target_rx, mut target_tx) = tokio::io::split(target_upgrade);

	let mut client_reponse = HttpResponse::SwitchingProtocols();
	client_reponse.upgrade("websocket");

	let cnacel = CancellationToken::new();

	tokio::task::spawn_local({
		let cancel = cancel.clone();
        async move {
            let mut client_stream = payload.map_err(std::io::Error::other);
            let mut client_read = tokio_util::io::StreamReader::new(&mut client_stream);
            if let Some(Err(e)) = cancel
                .run_until_cancelled(tokio::io::copy(&mut client_read, &mut target_tx))
                .await
            {
                tracing::error!("Error proxying websocket client bytes to target: {e}");
            };

            tracing::info!("Websocket client closed");
        }
	});

	// =======StreamWithCancel will cancel the CancellationToken when stream get None=============
	let target_stream =
        StreamWithCancel::new(tokio_util::io::ReaderStream::new(target_rx), cancel);
    Ok(client_response.streaming(target_stream))
}

service B (pseudocode)

pub(crate) async fn send_message(
    req: HttpRequest,
    stream: Payload,
) {
	let (res, session, msg_stream) = actix_ws::handle(&req, stream)?;
    let cancel = CancellationToken::new();
    rt::spawn(send_message_ws(
        task_id,
        req,
        session.clone(),
        cancel.clone(),
    ));
    rt::spawn(echo_heartbeat_ws(session, msg_stream, cancel));
    Ok(res)
}
async fn echo_heartbeat_ws(
    mut session: actix_ws::Session,
    mut msg_stream: actix_ws::MessageStream,
    cancel: CancellationToken,
) {

	let _cancel_guard = cancel.drop_guard();

	let mut interval = interval(HEARTBEAT_INTERVAL);
	let reason = loop {
        let tick = interval.tick();
        pin!(tick);

        match future::select(msg_stream.next(), tick).await {
            Either::Left((Some(Ok(msg)), _)) => {
                tracing::trace!("msg: {msg:?}");

                match msg {
                    Message::Close(reason) => {
                        break reason;
                    }

                    msg_to_process => {...}
                };
            }

            Either::Left((Some(Err(err)), _)) => {
                tracing::error!("{}", err);
                break None;
            }

            // client WebSocket stream ended
            Either::Left((None, _)) => break None,

            // heartbeat interval ticked
            Either::Right((_inst, _)) => {
                ...
            }
        }
    };

    // attempt to close connection gracefully
    let _ = session.close(reason).await;
}

bitcapybara avatar Aug 27 '25 01:08 bitcapybara

@bitcapybara Your program was supposed to detect the EOF on the read stream indeed, but this is different from some of the complains in #1313, where people were referring to the writer side.

While investigating this problem, I noticed that when we feed an error or EOF through the PayloadSender, we don't wake up the receiving web::Payload, that should probably be the real culprit on your example.

This change seems to work for you because the dispatcher simply gives up after receiving the FIN from the client, but I don't think this is correct behavior. I will open an issue to fix the PayloadSender and then another one where we can discuss if we want to revert this change.

thalesfragoso avatar Aug 27 '25 15:08 thalesfragoso