rust-s3 icon indicating copy to clipboard operation
rust-s3 copied to clipboard

Async causes OOM when transferring from faster sources than destinations

Open sempervictus opened this issue 4 years ago • 9 comments

Describe the bug In a use case where messages are being streamed off a queue and dispatched via this library, async calls result in unbounded memory growth, file descriptor exhaustion, and absurd object commit measurements. So a Tokio main calling the following:

async fn send_s3(msg: &Message<'_>, bucket: Bucket) -> Result<String,Box<dyn Error> > {
    let s3_timer = METRIC_INGESTOR_S3_UPLOAD
        .with_label_values(&["all"])
        .start_timer();
    let path = s3_msg_path(msg.clone());
    // println!("S3 Path: {}", &path);
    let (_, code) = bucket.put_object(&path, msg.source.as_bytes()).await.unwrap();
    let upload = "S3 upload result: ".to_string() + &code.to_string();
    s3_timer.observe_duration();
    Ok(upload)
}

results in the observed duration being ~9s, memory growing non-stop until OOM, and an 8c system uploads to a single minio @ ~25msg/s with a lot of jitter on the graphs. Replacing async with a rayon::ThreadPool and the send_s3 rewritten as

fn send_s3(msg: &Message<'_>, bucket: Bucket) -> Result<String,Box<dyn Error> > {
    let s3_timer = METRIC_INGESTOR_S3_UPLOAD
        .with_label_values(&["all"])
        .start_timer();
    let path = s3_msg_path(msg.clone());
    // println!("S3 Path: {}", &path);
    let (_, code) = bucket.put_object_blocking(&path, msg.source.as_bytes()).unwrap();
    let upload = "S3 upload result: ".to_string() + &code.to_string();
    s3_timer.observe_duration();
    Ok(upload)
}

called synchronously from the threads, i get no memory growth, and message rates are a flat ~27/s with an observed duration of ~0.9s (still awful).

Expected behavior Async do not await a return for 9s, and dont keep consuming off the queue when they're still not done with all of their dispatch. I think the ideal here would be to fire-and-forget the S3 message in that once the PUT/POST is sent, the source data buffer should be freed unless referenced elsewhere. Otherwise as the main thread eats off the MQ and dispatches async S3 puts, the memory used by those messages doesn't free and any reasonably long-running binary crashes.

Environment

  • Rust version: [e.g. 1.54]
  • lib version [e.g. 0.26]

Additional context The receiving Minio server is showing almost no CPU use, no IOWait, no memory pressure - it looks like the S3 client is the culprit in both the very slow object sends (~0.9s commit times are awful, and thats in synchronous). Its the default official dockerized minio being used for lab testing. Grafana view of the Prometheus data collected in that call - left part is async (crashed with OOM), right part is sync with Rayon; top graph is commit time, bottom graph is messages/sec: Screenshot from 2021-08-13 15-54-38

sempervictus avatar Aug 13 '21 19:08 sempervictus

To expand on this a bit - based on the counter of messages dequeued and the counter of messages which complete the S3 write, it looks like the async model keeps dequeuing forever spawning new threads which are blocked on the S3 client's commit times. Rayon doesn't seem to allow spawn(async move || like Tokio does... so i can't easily mix and match thread pools with async move threads (havent found how yet). From what i understand however, i shouldn't have to - async dispatch of a write should send the write request and drop the object reference to the buffer sent from everywhere else allowing the memory to be reclaimed.

sempervictus avatar Aug 13 '21 23:08 sempervictus

When it doesn't OOM in async it does this source: hyper::Error(Connect, ConnectError("tcp connect error", Os { code: 24, kind: Uncategorized, message: "No file descriptors available" })) })) }', src/main.rs:530:44 and that's when limits.conf says

nobody           soft    nofile          1048576
nobody           hard    nofile          1048576

sempervictus avatar Aug 14 '21 00:08 sempervictus

Confirm that its the client - using a single threaded RusotoS3 async implementation, i see with negligible CPU utilization (as opposed to walled with this client): image Their async version works this well as:

async fn send_s3(msg: &Message<'_>, s3cli: S3Storage) -> Result<String,Box<dyn Error> > {
    let s3_timer = METRIC_INGESTOR_S3_UPLOAD
        .with_label_values(&["all"])
        .start_timer();
    let path = s3_msg_path(msg.clone());
    // println!("S3 Path: {}", &path);
    let req = rusoto_s3::PutObjectRequest {
        acl: None,
        body: Some(msg.source.as_bytes().to_vec().into()),
        bucket: s3cli.bucket,
        bucket_key_enabled: Some(true),
        cache_control: None,
        content_disposition: None,
        content_encoding: None,
        content_language: None,
        content_length: Some(msg.source.as_bytes().len() as i64),
        content_md5: None,
        content_type: None,
        expected_bucket_owner: None,
        expires: None,
        grant_full_control: None,
        grant_read: None,
        grant_read_acp: None,
        grant_write_acp: None,
        key: path,
        metadata:None,
        object_lock_legal_hold_status:None,
        object_lock_mode:None,
        object_lock_retain_until_date:None,
        request_payer:None,
        sse_customer_algorithm: None,
        sse_customer_key: None,
        sse_customer_key_md5: None,
        ssekms_encryption_context: None,
        ssekms_key_id: None,
        server_side_encryption: None,
        storage_class: None,
        tagging: None,
        website_redirect_location: None,
    };
    let result = s3cli.cli
        .put_object(req)
        .await
        .expect("Couldn't PUT object");
    s3_timer.observe_duration();
    let upload = match result.e_tag {
        Some(t) => "S3 upload result: 200".to_owned(),
        None => "Failed to upload message to S3".to_owned()
    };
    Ok(upload)
}

I'm guessing there's a blocking call somewhere in the bowels of this client or possibly a very aggressive poll since i see CPU usage with this client at 100%, roughly 55-60% of which shows up as kernel time.

sempervictus avatar Aug 14 '21 18:08 sempervictus

Using tokio::spawn(async move || for every message now yields ~100msg/sec using the rusoto lib and bottlenecked at the small minio host hitting 30% iowait. While switching S3 libs has solved my individual problem, it seems there are issues with this one which have been skylined by my experienece and may need redress to avoid similar headaches for other developers.

sempervictus avatar Aug 14 '21 22:08 sempervictus

@sempervictus This is a very interesting rundown, thank you! I apologise for missing it until now, I'll look into the put code, as this is definitely not expected behaviour...

durch avatar Jul 21 '22 21:07 durch

I'm running into the same issue, seems like this wasn't addresses in the last 2 years.

MiniaczQ avatar Oct 06 '23 09:10 MiniaczQ

Thanks for pinging this thread, I missed @durch's response. Its been a while since I have dug around this code but should be able to revive relevant brain cache contents if I can be of use. I think we might need a call graph generated inside the async dispatch to localize and perform RCA.

sempervictus avatar Oct 06 '23 12:10 sempervictus

AWS's S3 lib is about to be production ready, but I've yet to check if it suffers from the same problems

MiniaczQ avatar Oct 06 '23 13:10 MiniaczQ

I havent used the AWS lib for this, but i do use it for some other work and it seems perfectly fine to me

sempervictus avatar Oct 06 '23 13:10 sempervictus