reqwest icon indicating copy to clipboard operation
reqwest copied to clipboard

Async Rate Limiting

Open theduke opened this issue 5 years ago • 14 comments

A lot of APIs have rate limits, and manually implementing a rate limiter over and over again is quite annoying to me.

There was some previous discussion in #169 but that was focused on a sync context.

@seanmonstar would you be open to a built in rate limiter? I imagine it working like this:

  • keep a sliding list of past requests per domain which have rate limiting enabled ( token bucket or something simpler)
  • if limit is reached, delay requests as necessary by just returning a delayed future

A simple implementation without prioritization could cause cascading delays though. A fix for that would be a request queue.

The complexity for this would not be too high and I would really love that functionality.

An alternative solution to this would be to have a async middleware feature with a pre-request hook that can return a future. That way a third party crate could supply the functionality easily, but async middleware is probably a larger topic. (remotely related discussion regarding sync hooks here: #155)

theduke avatar Apr 10 '19 19:04 theduke

We've been working on a general middleware stack, and have a form of rate-limiting here: https://github.com/tower-rs/tower/blob/master/tower-limit/src/rate/service.rs

It'd probably be useful to make adjustments there.

seanmonstar avatar Apr 10 '19 20:04 seanmonstar

@seanmonstar I've seen examples on how to integrate the hyper client and tower. Is it also possible to do that with reqwest and tower? Is there any example code or documentation that I can use as a hint to build that integration?

Thanks for all of your work with warp, hyper, reqwest & co. It's really great to use your libs!

edrevo avatar Jan 27 '20 11:01 edrevo

@edrevo the easiest way is to just use tower::service_fn:

let client = reqwest::Client::new(); // or use builder

let svc = tower::service_fn(move |req| {
    client.execute(req)
});

seanmonstar avatar Jan 27 '20 18:01 seanmonstar

@seanmonstar Would you be able to provide a quick example on how to convert one of the reqwest examples into a rate-limited one?

ie, from the docs:

let client = reqwest::Client::new();
let res = client.post("http://httpbin.org/post")
    .body("the exact body that is sent")
    .send()
    .await?;

How would we refactor that into a rate limited request using tower-limit?

ardeaf avatar Feb 04 '20 20:02 ardeaf

A bit late but here's an example of the above using tower if someone looks at this issue in the future

use reqwest::{Body, Method, Request, Url};
use std::time::Duration;
use tower::Service;
use tower::ServiceExt;

let client = reqwest::Client::new();
let mut svc = tower::ServiceBuilder::new()
    .rate_limit(100, Duration::new(10, 0)) // 100 requests every 10 seconds
    .service(tower::service_fn(move |req| client.execute(req)));

let mut req = Request::new(Method::POST, Url::parse("http://httpbin.org/post")?);
*req.body_mut() = Some(Body::from("the exact body that is sent"));

let res = svc.ready_and().await?.call(req).await?;

Definitely a bit more verbose but it works like a charm and the client and service needs to be setup only once anyway

Mathspy avatar Apr 06 '20 23:04 Mathspy

I've not had any success getting this working by storing the service in a struct... I can't find a proper way to declare the type of the Service. If I do:

struct ServiceStruct {
    service: dyn tower::Service<Response=HttpResponse,Error=HttpError,Future=Pin<Box<Result<HttpResponse,HttpError>>>>,
... more fields ...
}

let mut svc = tower::ServiceBuilder::new()
    .rate_limit(1, Duration::new(0, 1600)) // 1 request every 1600ms
    .service(tower::service_fn(move |req| client.execute(req)));

ServiceStruct { service: svc, ... more fields }    

But the type definition for tower::Service in the struct is wrong, and I can't find any usable documentation for declaring the type... has anyone had success with this?

UPDATE:

I had the wrong end of the stick with this one, but I got there in the end. For anyone looking to do something similar, here is the adapted code for storing a rate limit service in a struct:

use reqwest::{Body, Method, Request, Url};
use std::time::Duration;
use tower::Service;
use tower::ServiceExt;
use tower_limit::rate::RateLimit;
use tower_util::ServiceFn;

struct ServiceStruct<T> {
    service: RateLimit<ServiceFn<T>>,
}

#[tokio::main]
async fn main() {
    let client = reqwest::Client::new();
    let service = tower::ServiceBuilder::new()
        .rate_limit(100, Duration::new(10, 0)) // 100 requests every 10 seconds
        .service(tower::service_fn(move |req| client.execute(req)));

    let mut service_struct = ServiceStruct{service};

    let mut req = Request::new(Method::POST, Url::parse("http://httpbin.org/post").unwrap());
    *req.body_mut() = Some(Body::from("the exact body that is sent"));

    let res = service_struct.service.ready_and().await.unwrap().call(req).await.unwrap();
    println!("res: {:?}", res);
}

flexabyte avatar Aug 03 '20 11:08 flexabyte

We've been working on a general middleware stack, and have a form of rate-limiting here: https://github.com/tower-rs/tower/blob/master/tower-limit/src/rate/service.rs

It'd probably be useful to make adjustments there.

This link seems to be non-functional now. Has there been any progress on this feature request? It'd be useful to have this available as an out-of-box addition!

svc-93 avatar Jun 21 '21 21:06 svc-93

@svc-93 I think that would be https://github.com/tower-rs/tower/blob/master/tower/src/limit/rate/service.rs now

liamdawson avatar Jun 29 '21 04:06 liamdawson

A bit late but here's an example of the above using tower if someone looks at this issue in the future

use reqwest::{Body, Method, Request, Url};
use std::time::Duration;
use tower::Service;
use tower::ServiceExt;

let client = reqwest::Client::new();
let mut svc = tower::ServiceBuilder::new()
    .rate_limit(100, Duration::new(10, 0)) // 100 requests every 10 seconds
    .service(tower::service_fn(move |req| client.execute(req)));

let mut req = Request::new(Method::POST, Url::parse("http://httpbin.org/post")?);
*req.body_mut() = Some(Body::from("the exact body that is sent"));

let res = svc.ready_and().await?.call(req).await?;

Definitely a bit more verbose but it works like a charm and the client and service needs to be setup only once anyway

How to use this in a multithreaded context where each thread uses the same tower service and needs to be constrained by the same rate limit?

eute avatar Aug 07 '21 18:08 eute

How to use this in a multithreaded context where each thread uses the same tower service and needs to be constrained by the same rate limit?

@eute Probably by using the rt-multi-thread feature of tokio.

# Cargo.toml
tokio = { version = "1.7.1", features = ["rt-multi-thread", "macros"] }
// src.main.rs

#[tokio::main]
async fn main() {}

The default "RuntimeFlavor" of tokio::main macros is multi_threaded. Source.

ilyazub avatar Nov 17 '21 12:11 ilyazub

@ilyazub I am getting the same problem as @eute. Service has to be mutable, so i can't share exclusive reference and operate concurrently

sergeyshaykhullin avatar Dec 12 '21 15:12 sergeyshaykhullin

If you are still finding a solution for client-side rate limiting, I made a little crate raliguard with Semaphore implementing fixed window algorithm to control execution times per a period. Here is an example of asynchronous usage where the semaphore is shared between threads. It also supports any async/await backends

use std::{thread, sync, time};

use raliguard::Semaphore;


// Create a semaphore with restriction `5 tasks per 1 second`
let original_sem = Semaphore::new(5, time::Duration::from_secs(1));

// Make it sharable between threads (or you can share between tasks)
let shared_sem = sync::Arc::new(
    sync::Mutex::new(original_sem)
);

// Spawn 15 threads
for _ in 0..15 {
    let cloned_sem = shared_sem.clone();
    let thread = thread::spawn(move || {
        // Lock mutex for exclusive usage
        let mut local_sem = cloned_sem.lock().unwrap();

        // Get required delay
        let calculated_delay = local_sem.calc_delay();
        
        // Release mutex, make semaphore available to use in another threads
        drop(local_sem);

        // If delay exists, sleep it
        if let Some(delay) = calculated_delay {
            thread::sleep(delay);
        }
        
        // Here you can do your requests or another stuff
    });
}

// Sleep 1 second. Only 10 threads will be completed at this time
// (first 5 with no delay and another 5 after a second)
thread::sleep(time::Duration::from_secs(1));

deknowny avatar May 19 '22 19:05 deknowny

This is much much easier to do now after reqwest v0.11.11

Client now implements service so you can do this to return a service powered by reqwest from a function and store it into a struct

use std::time::Duration;

use reqwest::{Error, Request, Response};
use tower::Service;

fn example() -> impl Service<Request, Response = Response, Error = Error> {
    let client = reqwest::Client::new();

    tower::ServiceBuilder::new()
        .rate_limit(10, Duration::from_secs(5))
        .service(client)
}

struct Example<S>
where
    S: Service<Request, Response = Response, Error = Error>,
{
    service: S,
}

fn main() {
    let example = Example { service: example() };
}

If you need your service to be usable from multiple different contexts the easiest thing is to clone your service and pass the clones into those contexts. However, RateLimit prevents services from being cloned because then each service would have its own rate limits and that defeats the purpose of rate limits, so we can use a Buffer service (which happens to have a different error type) like so:

diff --git a/src/main.rs b/src/main.rs
index d39d616..8547d43 100644
--- a/src/main.rs
+++ b/src/main.rs
@@ -1,19 +1,20 @@
 use std::time::Duration;
 
 use reqwest::{Error, Request, Response};
-use tower::Service;
+use tower::{BoxError, Service};
 
-fn example() -> impl Service<Request, Response = Response, Error = Error> {
+fn example() -> impl Service<Request, Response = Response, Error = BoxError> {
     let client = reqwest::Client::new();
 
     tower::ServiceBuilder::new()
+        .buffer(100)
         .rate_limit(10, Duration::from_secs(5))
         .service(client)
 }
 
 struct Example<S>
 where
-    S: Service<Request, Response = Response, Error = Error>,
+    S: Service<Request, Response = Response, Error = BoxError>,
 {
     service: S,
 }

Mathspy avatar Aug 08 '22 18:08 Mathspy

Do I understand correctly, that RequestBuilder pattern is not compatible with using tower service? RequestBuilder is initialized with Client - which is fine when using reqwest on its own, but not when we need to use tower that wraps the client..

Setting body may be as easy as assigning to *req.body_mut(), however properly updating headers (as well as other things builder helps with) can be a lot more involved.

@seanmonstar is there a chance RequestBuilder could be decoupled from Client somehow? The only place it actually uses the client field is within send() method... building the request itself doesn't need the client at all. Obviously main concern here would be API backwards compatibility - otherwise what I want could be achieved e.g. by just wrapping client field with Option.

As a workaround I can actually use some dummy client when instantiating RequestBuilder, and then instead of using send() I can get request with build(), and then send using tower service. It's just slightly annoying that I need to pass in the dummy client which will never be used.

nirvana-msu avatar Oct 05 '22 08:10 nirvana-msu

RateLimit prevents services from being cloned because then each service would have its own rate limits and that defeats the purpose of rate limits, so we can use a Buffer service like so

Doesn't this introduce a race condition?

RateLimit does not reserve any capacity upon poll_ready, unlike e.g. ConcurrencyLimit. The only thing that prevents race condition when using RateLimit by itself is that poll_ready/call methods take &mut self, so you can't invoke those concurrently from different tasks without cloning (and this presumably is the main reason Clone wasn't implemented for them, unlike was done for ConcurrencyLimit).

Putting Buffer service in front with bound larger than 1 sidesteps that - it is now possible for multiple (up to bound) tasks to obtain permission from poll_ready, all without actually reserving any rate limit capacity - so when they proceed to call, there may not be enough capacity for them all (only capacity for one is guaranteed).

nirvana-msu avatar Nov 27 '22 03:11 nirvana-msu