TTL not really work
If I insert with ttl 10000 items, with 60sec TTL in my on_evict in a callback, I will see only ~1000 evict items after 60sec.
During checking len() of such cache, it also shows ~9000 items.
I suppose the cleaner process is not working correctly, and there is something with the TTL map.
Hi, could you provide some code to help me reproduce your situation?
Yeah, sorry about the issue without repro. I will try to make a small example.
@al8n Okey this is repro:
use tokio::time::{sleep, Duration};
use stretto::AsyncCache;
#[tokio::main]
async fn main() {
let cache: AsyncCache<String, String> = AsyncCache::new(12960, 1e6 as i64, tokio::spawn).unwrap();
for i in 0..10000 {
cache.insert_with_ttl(
format!("key{}", i),
format!("value{}", i),
1,
Duration::from_secs(60),
)
.await;
sleep(Duration::from_millis(1)).await;
}
cache.wait().await.unwrap();
println!("Current size: {}", cache.len());
sleep(Duration::from_secs(100)).await;
println!("New size: {}", cache.len());
}
on my machine, I got something like this:
Current size: 10000
New size: 4824
If I have no sleep(Duration::from_millis(1)).await; during insert, it's working as expected. I use this sleep to simulate natural load behavior.
Regards,
I hope you can reproduce my issue.
@al8n, maybe next week I can look into it, but I suppose it's a bug.
Okey, I found why https://github.com/al8n/stretto/blob/main/src/store.rs#L285C29-L285C40 you have no guarantee what try_cleanup_async will be invoked strictly each second.
I tested it, and just print the current bucket and the current attempt to cleanup - we time to the time skip seconds.
Okey, this one is working, but I am not sure how it's efficient:
diff --git a/src/ttl.rs b/src/ttl.rs
index 1c24bf2..1b7c36a 100644
--- a/src/ttl.rs
+++ b/src/ttl.rs
@@ -3,6 +3,7 @@ use std::collections::{hash_map::RandomState, HashMap};
use std::hash::BuildHasher;
use std::ops::{Deref, DerefMut};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
+use itertools::Itertools;
use crate::CacheError;
@@ -200,11 +201,24 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
pub fn try_cleanup(&self, now: Time) -> Result<Option<HashMap<u64, u64, S>>, CacheError> {
let bucket_num = cleanup_bucket(now);
- Ok(self
- .buckets
- .write()
- .remove(&bucket_num)
- .map(|bucket| bucket.map))
+ let bucket_keys: Vec<i64> = self.buckets.read().keys().sorted().cloned().collect();
+ // println!("try_cleanup bucket_num: {} buckets:{:#?}", bucket_num, bucket_keys);
+ let mut ret_map: HashMap<u64, u64, S> = HashMap::with_hasher(self.hasher.clone());
+ for map in bucket_keys
+ .iter()
+ .filter(|key| **key < bucket_num)
+ .map(|key| {
+ self
+ .buckets
+ .write()
+ .remove(key)
+ .map(|bucket| bucket.map)
+ }) {
+ if map.is_some() {
+ ret_map.extend(map.unwrap().iter());
+ }
+ }
+ Ok(Some(ret_map))
}
pub fn hasher(&self) -> S {
will be better to use BTreeMap and range to fast filtering over the index.
Okey option with btree and range:
diff --git a/src/ttl.rs b/src/ttl.rs
index 1c24bf2..e72ed4d 100644
--- a/src/ttl.rs
+++ b/src/ttl.rs
@@ -1,5 +1,5 @@
use parking_lot::RwLock;
-use std::collections::{hash_map::RandomState, HashMap};
+use std::collections::{hash_map::RandomState, HashMap, BTreeMap};
use std::hash::BuildHasher;
use std::ops::{Deref, DerefMut};
use std::time::{Duration, SystemTime, UNIX_EPOCH};
@@ -100,7 +100,7 @@ impl<S: BuildHasher> DerefMut for Bucket<S> {
#[derive(Debug)]
pub(crate) struct ExpirationMap<S = RandomState> {
- buckets: RwLock<HashMap<i64, Bucket<S>, S>>,
+ buckets: RwLock<BTreeMap<i64, Bucket<S>>>,
hasher: S,
}
@@ -108,7 +108,7 @@ impl Default for ExpirationMap {
fn default() -> Self {
let hasher = RandomState::default();
Self {
- buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
+ buckets: RwLock::new(BTreeMap::new()),
hasher,
}
}
@@ -123,7 +123,7 @@ impl ExpirationMap {
impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
pub(crate) fn with_hasher(hasher: S) -> ExpirationMap<S> {
ExpirationMap {
- buckets: RwLock::new(HashMap::with_hasher(hasher.clone())),
+ buckets: RwLock::new(BTreeMap::new()),
hasher,
}
}
@@ -200,11 +200,22 @@ impl<S: BuildHasher + Clone + 'static> ExpirationMap<S> {
pub fn try_cleanup(&self, now: Time) -> Result<Option<HashMap<u64, u64, S>>, CacheError> {
let bucket_num = cleanup_bucket(now);
- Ok(self
- .buckets
- .write()
- .remove(&bucket_num)
- .map(|bucket| bucket.map))
+ let bucket_keys: Vec<i64> = self.buckets.read().range(..bucket_num).map(|(key, _)| *key).collect();
+ let mut ret_map: HashMap<u64, u64, S> = HashMap::with_hasher(self.hasher.clone());
+ for map in bucket_keys
+ .iter()
+ .map(|key| {
+ self
+ .buckets
+ .write()
+ .remove(key)
+ .map(|bucket| bucket.map)
+ }) {
+ if map.is_some() {
+ ret_map.extend(map.unwrap().iter());
+ }
+ }
+ Ok(Some(ret_map))
}
pub fn hasher(&self) -> S {
Sorry for the late response, thanks! Would you mind open a PR and let us see if we can merge it?
I don't think I will have time next few weeks.