Proposal: add `ring_ahead` and `ring_guard` decorators for refresh-ahead & stampede protection
Hi @youknowone,
I’ve implemented two decorators to complement ring.lru and similar caching:
- @ring_ahead(refresh_ratio=…): when a cached item’s age exceeds expire * refresh_ratio, launch a background refresh while returning the existing (still valid) value
- @ring_guard(wait_timeout=…): on a cache miss, allow only one thread to rebuild; other threads wait (up to timeout) — avoiding stampede
They use per-key locks, track refresh state, infer age (including via Redis TTL), and gracefully handle errors. I have tests, docs, and examples.
If you like the direction, I can prepare a PR for your review. Would you accept something like this?
@functools.cache
def _extract_wrapped_ring_func(wrapped, instance):
"""Extract the ring-decorated function from wrapped context.
Args:
wrapped: The wrapped function
instance: The instance (None for functions, object for methods)
Returns:
The ring-decorated function with storage attribute
Raises:
ValueError: If ring decorator is not found
"""
if hasattr(wrapped, "storage"):
return wrapped
elif instance is not None:
ring_func = getattr(instance, wrapped.__name__)
if hasattr(ring_func, "storage"):
return ring_func
raise ValueError("Ring decorator must be applied first")
_ring_ahead_executor = concurrent.futures.ThreadPoolExecutor(max_workers=6)
def ring_ahead(func=None, *, refresh_ratio=0.62, executor=None):
"""Refresh cache in background when age exceeds threshold.
Prevents blocking on cache expiry by proactively refreshing stale cache.
Example:
@ring_ahead
@ring.lru(expire=60)
def get_data(key): ...
"""
@dataclasses.dataclass
class RefreshState:
timestamp: float = dataclasses.field(default_factory=time.time)
refreshing: bool = False
lock: threading.Lock = dataclasses.field(default_factory=threading.Lock)
_states: dict[str, RefreshState] = {}
_states_lock = threading.Lock()
def _get_state(key: str) -> tuple[RefreshState, bool]:
"""Get or create state for key using double-checked locking."""
if state := _states.get(key):
return state, True
with _states_lock:
if state := _states.get(key):
return state, True
state = _states[key] = RefreshState()
return state, False
def _trigger_refresh(key: str, ring_func, args, kwargs, state: RefreshState):
"""Trigger background refresh if not already running."""
with state.lock:
if state.refreshing:
return
state.refreshing = True
def _refresh():
try:
ring_func.update(*args, **kwargs)
state.timestamp = time.time()
except Exception as e:
log.exception(
f"ring-ahead({ring_func.__func__.__qualname__}): Refresh failed for {key}: {e}"
)
finally:
state.refreshing = False
(executor or _ring_ahead_executor).submit(_refresh)
@wrapt.decorator
def wrapper(wrapped, instance, args, kwargs):
ring_func = _extract_wrapped_ring_func(wrapped, instance)
if not (expire := ring_func.storage.rope.config.expire_default):
raise ValueError("ring_ahead requires explicit expire time")
key = ring_func.key(*args, **kwargs)
state, existed = _get_state(key)
cache_age = time.time() - state.timestamp if existed else 0
# Trigger background refresh for stale cache (only if not yet expired)
if expire > cache_age > expire * refresh_ratio and ring_func.has(
*args, **kwargs
):
_trigger_refresh(key, ring_func, args, kwargs, state)
result = wrapped(*args, **kwargs)
# Update timestamp after cache refresh
if cache_age >= expire:
state.timestamp = time.time()
return result
result = wrapper if func is None else wrapper(func)
result._get_refresh_state = _get_state
return result
def ring_guard(func=None, *, wait_timeout=6):
"""Prevent cache stampede: only one request updates cache, others wait.
Uses double-checked locking for optimal performance (~60ns fast path).
If wait_timeout is exceeded, continues execution instead of raising error.
Example:
@ring_guard
@ring.lru(expire=60)
def get_data(key): ...
"""
_locks: dict[str, threading.Lock] = {}
_locks_lock = threading.Lock()
def _get_lock(key: str) -> threading.Lock:
"""Get or create lock using DCL pattern."""
if lock := _locks.get(key):
return lock
with _locks_lock:
return _locks.setdefault(key, threading.Lock())
@wrapt.decorator
def wrapper(wrapped, instance, args, kwargs):
ring_func = _extract_wrapped_ring_func(wrapped, instance)
# Fast path: return cached result without locking
if ring_func.has(*args, **kwargs):
return wrapped(*args, **kwargs)
# Slow path: acquire lock to prevent stampede
key = ring_func.key(*args, **kwargs)
lock = _get_lock(key)
acquired = lock.acquire(timeout=wait_timeout)
if not acquired:
log.warning(
f"ring_guard({ring_func.__func__.__qualname__}): Failed to acquire lock within {wait_timeout}s with key={key}"
)
try:
return wrapped(*args, **kwargs)
finally:
if acquired:
lock.release()
return wrapper if func is None else wrapper(func)
Sorry for the late reply.
I haven’t contributed to this project for quite some time. At this point, I’m happy to help fix bugs that affect existing users. However, I’m already burdened with maintaining many other open-source projects, and I give higher priority to the ones I personally use. Since I currently don’t have any code that uses ring, I’m reluctant to spend much additional time maintaining new features for this project.
That said, this doesn’t mean I intend to let the project slowly die. If you like this project and are interested in maintaining it and adding new features, I’ll do my best to help you take on that role. But as long as I remain the sole maintainer, I’ll be hesitant to merge new features.
Sorry for the late reply.
I haven’t contributed to this project for quite some time. At this point, I’m happy to help fix bugs that affect existing users. However, I’m already burdened with maintaining many other open-source projects, and I give higher priority to the ones I personally use. Since I currently don’t have any code that uses ring, I’m reluctant to spend much additional time maintaining new features for this project.
That said, this doesn’t mean I intend to let the project slowly die. If you like this project and are interested in maintaining it and adding new features, I’ll do my best to help you take on that role. But as long as I remain the sole maintainer, I’ll be hesitant to merge new features.
First of all, thank you for creating and maintaining ring. I know it's a lot of work to keep open-source projects alive, and I really appreciate what you've already done — the library has been very useful to me.
I like this project a lot, and I'm happy to help support it if needed. I currently have enough time to contribute, so I can try to be involved more actively.
I'd like to discuss this feature request. If you also feel it's worth adding, I'll prepare a PR. If you think it's not really necessary right now, that's totally fine — we can leave it out.
Please let me know what you think.
Thanks again for all your work.
Those features are not looking like only applied to lru, but applicable to any backend. e.g. either dict and memcached can reuse this feature.
Can this be added to CacheUserInterface first? Then you can override default_action to the new action.
Those features are not looking like only applied to lru, but applicable to any backend. e.g. either dict and memcached can reuse this feature.
Can this be added to
CacheUserInterfacefirst? Then you can overridedefault_actionto the new action.
Yes, the design is meant to work for all backends — not just lru. Dict, memcached, etc. should all be able to reuse it. My current thinking is to expose it as decorators, for example:
@ring.guard @ring.ahead @ring.lru(expire=60) def get_data(key): ...
What do you think about this approach?
As long as it work well with action, I am inclined to action.
Single decorator with many parameters can be easily reusable by wrapping function or functools.partial, but multiple decorators are not easy to be reused without copy.
As long as it work well with action, I am inclined to action.
Single decorator with many parameters can be easily reusable by wrapping function or functools.partial, but multiple decorators are not easy to be reused without copy.
Let me check if I understand your preference correctly.
Are you leaning toward an API like this:
@ring.lru(expire=60, refresh_ahead=True, stampede_lock: bool | float = True) def get_data(key): ...
(where refresh_ahead enables background refresh before expiry, and stampede_lock enables the stampede/thundering-herd protection, or can be a timeout value if it's a float)
Is this closer to what you had in mind?
Yes, similar like that.
When using default_action, the default action can be overriden.
If it is overriden to get,
@ring.lru(..., default_action='get')
def get_data(key): ...
Then get_data always will return None until get_data.update() is called.
So by adding an action get_and_refresh_ahead,
@ring.lru(..., default_action='get_and_refresh_ahead')
def get_data(key): ...
The default behavior of get_data() will be get_and_refresh_ahead.
This is the basic idea. I don't have a concrete idea about further user interface yet.
After adding an action, your @ring_guard also will be able to have same interface by configuring inner function, if it is the desired interface.
I am a bit confused here... do ring_ahead and ring_guard both applicable to same function?
I am a bit confused here... do ring_ahead and ring_guard both applicable to same function?
Yes. They target different phases, so they’re usually used separately:
ring_ahead does proactive/background refresh before the value expires, and ring_guard prevents a thundering herd when the cache is cold.
They can also be stacked on the same function if you want both behaviors, but that’s optional.
Thanks, then action doesn't make sense. You already have a patched version, right? Please open a PR. let's discuss more on it.