rust-s3
rust-s3 copied to clipboard
Async causes OOM when transferring from faster sources than destinations
Describe the bug
In a use case where messages are being streamed off a queue and dispatched via this library, async calls result in unbounded memory growth, file descriptor exhaustion, and absurd object commit measurements. So a Tokio main calling the following:
async fn send_s3(msg: &Message<'_>, bucket: Bucket) -> Result<String,Box<dyn Error> > {
let s3_timer = METRIC_INGESTOR_S3_UPLOAD
.with_label_values(&["all"])
.start_timer();
let path = s3_msg_path(msg.clone());
// println!("S3 Path: {}", &path);
let (_, code) = bucket.put_object(&path, msg.source.as_bytes()).await.unwrap();
let upload = "S3 upload result: ".to_string() + &code.to_string();
s3_timer.observe_duration();
Ok(upload)
}
results in the observed duration being ~9s, memory growing non-stop until OOM, and an 8c system uploads to a single minio @ ~25msg/s with a lot of jitter on the graphs.
Replacing async with a rayon::ThreadPool and the send_s3 rewritten as
fn send_s3(msg: &Message<'_>, bucket: Bucket) -> Result<String,Box<dyn Error> > {
let s3_timer = METRIC_INGESTOR_S3_UPLOAD
.with_label_values(&["all"])
.start_timer();
let path = s3_msg_path(msg.clone());
// println!("S3 Path: {}", &path);
let (_, code) = bucket.put_object_blocking(&path, msg.source.as_bytes()).unwrap();
let upload = "S3 upload result: ".to_string() + &code.to_string();
s3_timer.observe_duration();
Ok(upload)
}
called synchronously from the threads, i get no memory growth, and message rates are a flat ~27/s with an observed duration of ~0.9s (still awful).
Expected behavior Async do not await a return for 9s, and dont keep consuming off the queue when they're still not done with all of their dispatch. I think the ideal here would be to fire-and-forget the S3 message in that once the PUT/POST is sent, the source data buffer should be freed unless referenced elsewhere. Otherwise as the main thread eats off the MQ and dispatches async S3 puts, the memory used by those messages doesn't free and any reasonably long-running binary crashes.
Environment
- Rust version: [e.g.
1.54] - lib version [e.g.
0.26]
Additional context
The receiving Minio server is showing almost no CPU use, no IOWait, no memory pressure - it looks like the S3 client is the culprit in both the very slow object sends (~0.9s commit times are awful, and thats in synchronous). Its the default official dockerized minio being used for lab testing.
Grafana view of the Prometheus data collected in that call - left part is async (crashed with OOM), right part is sync with Rayon; top graph is commit time, bottom graph is messages/sec:

To expand on this a bit - based on the counter of messages dequeued and the counter of messages which complete the S3 write, it looks like the async model keeps dequeuing forever spawning new threads which are blocked on the S3 client's commit times. Rayon doesn't seem to allow spawn(async move || like Tokio does... so i can't easily mix and match thread pools with async move threads (havent found how yet). From what i understand however, i shouldn't have to - async dispatch of a write should send the write request and drop the object reference to the buffer sent from everywhere else allowing the memory to be reclaimed.
When it doesn't OOM in async it does this source: hyper::Error(Connect, ConnectError("tcp connect error", Os { code: 24, kind: Uncategorized, message: "No file descriptors available" })) })) }', src/main.rs:530:44 and that's when limits.conf says
nobody soft nofile 1048576
nobody hard nofile 1048576
Confirm that its the client - using a single threaded RusotoS3 async implementation, i see with negligible CPU utilization (as opposed to walled with this client):
Their async version works this well as:
async fn send_s3(msg: &Message<'_>, s3cli: S3Storage) -> Result<String,Box<dyn Error> > {
let s3_timer = METRIC_INGESTOR_S3_UPLOAD
.with_label_values(&["all"])
.start_timer();
let path = s3_msg_path(msg.clone());
// println!("S3 Path: {}", &path);
let req = rusoto_s3::PutObjectRequest {
acl: None,
body: Some(msg.source.as_bytes().to_vec().into()),
bucket: s3cli.bucket,
bucket_key_enabled: Some(true),
cache_control: None,
content_disposition: None,
content_encoding: None,
content_language: None,
content_length: Some(msg.source.as_bytes().len() as i64),
content_md5: None,
content_type: None,
expected_bucket_owner: None,
expires: None,
grant_full_control: None,
grant_read: None,
grant_read_acp: None,
grant_write_acp: None,
key: path,
metadata:None,
object_lock_legal_hold_status:None,
object_lock_mode:None,
object_lock_retain_until_date:None,
request_payer:None,
sse_customer_algorithm: None,
sse_customer_key: None,
sse_customer_key_md5: None,
ssekms_encryption_context: None,
ssekms_key_id: None,
server_side_encryption: None,
storage_class: None,
tagging: None,
website_redirect_location: None,
};
let result = s3cli.cli
.put_object(req)
.await
.expect("Couldn't PUT object");
s3_timer.observe_duration();
let upload = match result.e_tag {
Some(t) => "S3 upload result: 200".to_owned(),
None => "Failed to upload message to S3".to_owned()
};
Ok(upload)
}
I'm guessing there's a blocking call somewhere in the bowels of this client or possibly a very aggressive poll since i see CPU usage with this client at 100%, roughly 55-60% of which shows up as kernel time.
Using tokio::spawn(async move || for every message now yields ~100msg/sec using the rusoto lib and bottlenecked at the small minio host hitting 30% iowait. While switching S3 libs has solved my individual problem, it seems there are issues with this one which have been skylined by my experienece and may need redress to avoid similar headaches for other developers.
@sempervictus This is a very interesting rundown, thank you! I apologise for missing it until now, I'll look into the put code, as this is definitely not expected behaviour...
I'm running into the same issue, seems like this wasn't addresses in the last 2 years.
Thanks for pinging this thread, I missed @durch's response. Its been a while since I have dug around this code but should be able to revive relevant brain cache contents if I can be of use. I think we might need a call graph generated inside the async dispatch to localize and perform RCA.
AWS's S3 lib is about to be production ready, but I've yet to check if it suffers from the same problems
I havent used the AWS lib for this, but i do use it for some other work and it seems perfectly fine to me