ring icon indicating copy to clipboard operation
ring copied to clipboard

Proposal: add `ring_ahead` and `ring_guard` decorators for refresh-ahead & stampede protection

Open wy-z opened this issue 3 months ago • 10 comments

Hi @youknowone,

I’ve implemented two decorators to complement ring.lru and similar caching:

  • @ring_ahead(refresh_ratio=…): when a cached item’s age exceeds expire * refresh_ratio, launch a background refresh while returning the existing (still valid) value
  • @ring_guard(wait_timeout=…): on a cache miss, allow only one thread to rebuild; other threads wait (up to timeout) — avoiding stampede

They use per-key locks, track refresh state, infer age (including via Redis TTL), and gracefully handle errors. I have tests, docs, and examples.

If you like the direction, I can prepare a PR for your review. Would you accept something like this?

@functools.cache
def _extract_wrapped_ring_func(wrapped, instance):
    """Extract the ring-decorated function from wrapped context.

    Args:
        wrapped: The wrapped function
        instance: The instance (None for functions, object for methods)

    Returns:
        The ring-decorated function with storage attribute

    Raises:
        ValueError: If ring decorator is not found
    """
    if hasattr(wrapped, "storage"):
        return wrapped
    elif instance is not None:
        ring_func = getattr(instance, wrapped.__name__)
        if hasattr(ring_func, "storage"):
            return ring_func

    raise ValueError("Ring decorator must be applied first")


_ring_ahead_executor = concurrent.futures.ThreadPoolExecutor(max_workers=6)


def ring_ahead(func=None, *, refresh_ratio=0.62, executor=None):
    """Refresh cache in background when age exceeds threshold.

    Prevents blocking on cache expiry by proactively refreshing stale cache.

    Example:
        @ring_ahead
        @ring.lru(expire=60)
        def get_data(key): ...
    """

    @dataclasses.dataclass
    class RefreshState:
        timestamp: float = dataclasses.field(default_factory=time.time)
        refreshing: bool = False
        lock: threading.Lock = dataclasses.field(default_factory=threading.Lock)

    _states: dict[str, RefreshState] = {}
    _states_lock = threading.Lock()

    def _get_state(key: str) -> tuple[RefreshState, bool]:
        """Get or create state for key using double-checked locking."""
        if state := _states.get(key):
            return state, True

        with _states_lock:
            if state := _states.get(key):
                return state, True
            state = _states[key] = RefreshState()
            return state, False

    def _trigger_refresh(key: str, ring_func, args, kwargs, state: RefreshState):
        """Trigger background refresh if not already running."""
        with state.lock:
            if state.refreshing:
                return
            state.refreshing = True

        def _refresh():
            try:
                ring_func.update(*args, **kwargs)
                state.timestamp = time.time()
            except Exception as e:
                log.exception(
                    f"ring-ahead({ring_func.__func__.__qualname__}): Refresh failed for {key}: {e}"
                )
            finally:
                state.refreshing = False

        (executor or _ring_ahead_executor).submit(_refresh)

    @wrapt.decorator
    def wrapper(wrapped, instance, args, kwargs):
        ring_func = _extract_wrapped_ring_func(wrapped, instance)
        if not (expire := ring_func.storage.rope.config.expire_default):
            raise ValueError("ring_ahead requires explicit expire time")

        key = ring_func.key(*args, **kwargs)
        state, existed = _get_state(key)
        cache_age = time.time() - state.timestamp if existed else 0

        # Trigger background refresh for stale cache (only if not yet expired)
        if expire > cache_age > expire * refresh_ratio and ring_func.has(
            *args, **kwargs
        ):
            _trigger_refresh(key, ring_func, args, kwargs, state)

        result = wrapped(*args, **kwargs)
        # Update timestamp after cache refresh
        if cache_age >= expire:
            state.timestamp = time.time()
        return result

    result = wrapper if func is None else wrapper(func)
    result._get_refresh_state = _get_state
    return result


def ring_guard(func=None, *, wait_timeout=6):
    """Prevent cache stampede: only one request updates cache, others wait.

    Uses double-checked locking for optimal performance (~60ns fast path).
    If wait_timeout is exceeded, continues execution instead of raising error.

    Example:
        @ring_guard
        @ring.lru(expire=60)
        def get_data(key): ...
    """
    _locks: dict[str, threading.Lock] = {}
    _locks_lock = threading.Lock()

    def _get_lock(key: str) -> threading.Lock:
        """Get or create lock using DCL pattern."""
        if lock := _locks.get(key):
            return lock
        with _locks_lock:
            return _locks.setdefault(key, threading.Lock())

    @wrapt.decorator
    def wrapper(wrapped, instance, args, kwargs):
        ring_func = _extract_wrapped_ring_func(wrapped, instance)

        # Fast path: return cached result without locking
        if ring_func.has(*args, **kwargs):
            return wrapped(*args, **kwargs)

        # Slow path: acquire lock to prevent stampede
        key = ring_func.key(*args, **kwargs)
        lock = _get_lock(key)
        acquired = lock.acquire(timeout=wait_timeout)
        if not acquired:
            log.warning(
                f"ring_guard({ring_func.__func__.__qualname__}): Failed to acquire lock within {wait_timeout}s with key={key}"
            )

        try:
            return wrapped(*args, **kwargs)
        finally:
            if acquired:
                lock.release()

    return wrapper if func is None else wrapper(func)

wy-z avatar Oct 15 '25 03:10 wy-z

Sorry for the late reply.

I haven’t contributed to this project for quite some time. At this point, I’m happy to help fix bugs that affect existing users. However, I’m already burdened with maintaining many other open-source projects, and I give higher priority to the ones I personally use. Since I currently don’t have any code that uses ring, I’m reluctant to spend much additional time maintaining new features for this project.

That said, this doesn’t mean I intend to let the project slowly die. If you like this project and are interested in maintaining it and adding new features, I’ll do my best to help you take on that role. But as long as I remain the sole maintainer, I’ll be hesitant to merge new features.

youknowone avatar Oct 28 '25 08:10 youknowone

Sorry for the late reply.

I haven’t contributed to this project for quite some time. At this point, I’m happy to help fix bugs that affect existing users. However, I’m already burdened with maintaining many other open-source projects, and I give higher priority to the ones I personally use. Since I currently don’t have any code that uses ring, I’m reluctant to spend much additional time maintaining new features for this project.

That said, this doesn’t mean I intend to let the project slowly die. If you like this project and are interested in maintaining it and adding new features, I’ll do my best to help you take on that role. But as long as I remain the sole maintainer, I’ll be hesitant to merge new features.

First of all, thank you for creating and maintaining ring. I know it's a lot of work to keep open-source projects alive, and I really appreciate what you've already done — the library has been very useful to me.

I like this project a lot, and I'm happy to help support it if needed. I currently have enough time to contribute, so I can try to be involved more actively.

I'd like to discuss this feature request. If you also feel it's worth adding, I'll prepare a PR. If you think it's not really necessary right now, that's totally fine — we can leave it out.

Please let me know what you think.

Thanks again for all your work.

wy-z avatar Oct 28 '25 09:10 wy-z

Those features are not looking like only applied to lru, but applicable to any backend. e.g. either dict and memcached can reuse this feature.

Can this be added to CacheUserInterface first? Then you can override default_action to the new action.

youknowone avatar Oct 28 '25 09:10 youknowone

Those features are not looking like only applied to lru, but applicable to any backend. e.g. either dict and memcached can reuse this feature.

Can this be added to CacheUserInterface first? Then you can override default_action to the new action.

Yes, the design is meant to work for all backends — not just lru. Dict, memcached, etc. should all be able to reuse it. My current thinking is to expose it as decorators, for example:

@ring.guard @ring.ahead @ring.lru(expire=60) def get_data(key): ...

What do you think about this approach?

wy-z avatar Oct 28 '25 09:10 wy-z

As long as it work well with action, I am inclined to action.

Single decorator with many parameters can be easily reusable by wrapping function or functools.partial, but multiple decorators are not easy to be reused without copy.

youknowone avatar Oct 28 '25 09:10 youknowone

As long as it work well with action, I am inclined to action.

Single decorator with many parameters can be easily reusable by wrapping function or functools.partial, but multiple decorators are not easy to be reused without copy.

Let me check if I understand your preference correctly.

Are you leaning toward an API like this:

@ring.lru(expire=60, refresh_ahead=True, stampede_lock: bool | float = True) def get_data(key): ...

(where refresh_ahead enables background refresh before expiry, and stampede_lock enables the stampede/thundering-herd protection, or can be a timeout value if it's a float)

Is this closer to what you had in mind?

wy-z avatar Oct 29 '25 02:10 wy-z

Yes, similar like that.

When using default_action, the default action can be overriden.

If it is overriden to get,

@ring.lru(..., default_action='get')
def get_data(key): ...

Then get_data always will return None until get_data.update() is called.

So by adding an action get_and_refresh_ahead,

@ring.lru(..., default_action='get_and_refresh_ahead')
def get_data(key): ...

The default behavior of get_data() will be get_and_refresh_ahead.

This is the basic idea. I don't have a concrete idea about further user interface yet.

After adding an action, your @ring_guard also will be able to have same interface by configuring inner function, if it is the desired interface.

youknowone avatar Oct 29 '25 13:10 youknowone

I am a bit confused here... do ring_ahead and ring_guard both applicable to same function?

youknowone avatar Oct 29 '25 13:10 youknowone

I am a bit confused here... do ring_ahead and ring_guard both applicable to same function?

Yes. They target different phases, so they’re usually used separately: ring_ahead does proactive/background refresh before the value expires, and ring_guard prevents a thundering herd when the cache is cold. They can also be stacked on the same function if you want both behaviors, but that’s optional.

wy-z avatar Oct 29 '25 13:10 wy-z

Thanks, then action doesn't make sense. You already have a patched version, right? Please open a PR. let's discuss more on it.

youknowone avatar Nov 10 '25 04:11 youknowone