valkey icon indicating copy to clipboard operation
valkey copied to clipboard

HFE backlog - improve active expiration job

Open ranshid opened this issue 4 months ago • 3 comments

Problem

Valkey’s current active expiration works by scanning volatile keys (and separately keys with volatile items) using the hashtable consistent scanning algorithm. While this is a simple design which eventually converge to delete all the expired keys and/or items, it has several drawbacks:

  1. Since the existing volatile key tracking is not kept order in anyway, many CPU cycles are wasting iterating over entries which are not expired.
  2. In order to make sure we allocate enough CPU resources for the active expiration job, the current algorithm is managing some heuristic based on the previous sampling to try and "guess" the percentage of stale expired keys and adjust the rate of the FAST expiration cron job to use more CPU resources in case the percentage is estimated to be high. This method is based on statistical "guess" and might lead to extra CPU resources allocated when there is no real need to.

We could do better. In https://github.com/valkey-io/valkey/pull/2089 we introduces the new vset which provides an efficient way to track volatile items. The vset keeps items in time-range buckets which provides a good semi-sorted tracking abilities. While the coarse bucket approach for tracking volatile fields works well, it comes with tradeoffs:

  1. Bucket counts can grow unbounded when expirations are highly fragmented, leading to higher memory usage.

  2. There is no efficient way to answer: “How many items are currently expired?” — which makes it harder to dynamically tune the expiration job rate.

Proposal

Introduce a Hierarchical Timer Wheel as a new optional bucket type within the vset. Hierarchal Timer Wheels are a subset of the coarse bucket algorithm, which makes their implementation using RAX (to hold the expiration buckets) great fit in order to expend the exisiting vset implementation. The Algorithm is very similar to the coarse bucket algorithm, however it is different in the sense that it logically keeps a "fixed" number of time buckets matching the different time frames (different for each wheel level). One of the main reasons we did not use the timer wheel algorithm in our existing implementation for tracking volatile hash fields, was the need to perform "cascading" of elements when the timer wheel wraps a complete wheel cycle. This might be less efficient for managing items in multiple objects, but managing cascading for a per-database index might come with very low impact.

Key benefits:

Bounded number of buckets: Unlike naive coarse bucketing, a timer wheel has a fixed number of slots per level, keeping memory overhead predictable. This will also allow placing a secondary BIT (binary index tree) matching the buckets sizes. using BIT will allow us to get better results querying the number of elements with expiration time higher than a specific time (Since the

Efficient cascading: Expired elements can be incrementally cascaded to finer-grained levels as time advances.

Binary Indexed Tree (BIT) support: Maintain a parallel BIT with per-bucket counts to efficiently compute the number of expired elements. This enables better control over the expiration rate (e.g., speeding up or slowing down the job based on backlog).

Natural vset extension: The timer wheel can be implemented as another bucket type, similar to the existing radix-tree bucket type, requiring minimal changes to the vset API.

Design Sketch

TimerWheel Structure

  • Backed by a rax (Radix Tree) keyed by time bucket.

  • Each bucket contains a hashtable/single/vector encoded container of elements expiring in that bucket.

  • Levels allow progressively finer-grained time windows.

  • TimerWheelStep() will need to be introduced as part of the existing scan for expired elements. It will need to advance the wheel, expire elements, and re-inserts those that still have time remaining into future slots. Doing so will make cascading done incrementally, spreading work across iterations instead of spiking CPU usage.

BIT Integration

Maintain a Binary Indexed Tree to store counts of elements per slot.

Allows O(log n) queries like “how many elements have expired?” which can guide the active expiration job’s rate limiter. However since 'n' is a constant by the number of buckets (few hundreds) it can be treated as constant comlexity.

Benefits

  • More predictable memory footprint (bounded buckets).

  • Reduced wasted CPU cycles scanning unexpired data.

  • Ability to make active expiration rate adaptive based on real backlog size.

  • Minimal disruption to the existing vset interface.

ranshid avatar Sep 16 '25 10:09 ranshid

Awesome proposal to improvement memory. I would suggest adding "help wanted" tag so anyone in the community looking to pick up a new problem to solve can review this proposal and take if forward.

Quick Questions; With hierarchial levels, is there effort involved to move items from lower level to higher level before getting handled for expiration? If yes can the moving cost cause churn, when there are a large number of items in the next level. Is there some sort of dynamic rebalancing happening, based on the number of items in a given level.

cherukum-Amazon avatar Sep 18 '25 22:09 cherukum-Amazon

I would suggest adding "help wanted" tag so anyone in the community looking to pick up a new problem to solve can review this proposal and take if forward.

Yeh - I placed the label on the top issue. I can also add it on these sub-tasks

Quick Questions; With hierarchical levels, is there effort involved to move items from lower level to higher level before getting handled for expiration? If yes can the moving cost cause churn, when there are a large number of items in the next level. Is there some sort of dynamic rebalancing happening, based on the number of items in a given level.

true. what you refer to as "moving items to higher levels" is the concept of "cascading" in HTW, only in my proposal it will indeed be done from the low level to the top level.

For example I used this (very simple PoC) and tested it in a unittest:

#define LEVELS 5               /* Max levels */
#define MIN_INTERVAL 16        /* Minimal interval in milliseconds */
#define LEVEL_MULTIPLIER_EXP 6 /* Increase in the interval every level */
#define MAX_BUCKETS ((LEVELS - 1) * (1LL << LEVEL_MULTIPLIER_EXP) + 1)
// Function pointer types
typedef long long (*GetExpiryFunc)(void *);
typedef void (*ExpireFunc)(void *);

typedef struct TimerWheel {
    rax *slots;
    long long last_iterated_time;
    GetExpiryFunc get_expiry;
    ExpireFunc expire;
} TimerWheel;

/* --------------------- Helpers --------------------- */

long long get_slot_width(int level) {
    return MIN_INTERVAL << (LEVEL_MULTIPLIER_EXP * level); // Level 0: 2^4 = 16ms, then *64 each level
}

uint8_t choose_level(long long delta) {
    for (uint8_t level = 0; level < LEVELS - 1; ++level) {
        if (delta < get_slot_width(level + 1)) {
            return level;
        }
    }
    return LEVELS - 1;
}


long long align_slot_time(long long time, uint8_t level) {
    long long width = get_slot_width(level);
    return (width == LLONG_MAX) ? 0 : (time / width) * width;
}

void build_rax_key(long long slot_time, char *buf) {
    slot_time = htonu64(slot_time);
    memcpy(buf, &slot_time, sizeof(slot_time));
}

long long read_rax_key(unsigned char *buf) {
    uint64_t slot_time = 0;
    memcpy(&slot_time, buf, sizeof(slot_time));
    slot_time = ntohu64(slot_time);
    return slot_time;
}

void *raxFirst(rax *rax) {
    raxIterator iter;
    raxStart(&iter, rax);
    raxSeek(&iter, ">=", NULL, 0);
    raxNext(&iter);
    void *data = iter.data;
    raxStop(&iter);
    return data;
}

void *tag_element(void *ptr, uint8_t level) {
    // Assume pointer is aligned to at least 16 bytes (4 low bits are zero)
    uintptr_t p = (uintptr_t)ptr;
    uintptr_t tagged = (p & ~0xFFULL) | ((uintptr_t)level & 0xFF);
    return (void *)tagged;
}

void *untag_pointer(void *tagged_ptr) {
    uintptr_t p = (uintptr_t)tagged_ptr;
    return (void *)(p & ~0xFFULL);
}

char get_tag(void *tagged_ptr) {
    uintptr_t p = (uintptr_t)tagged_ptr;
    return (uint8_t)(p & 0xFF);
}

void reset_stats(void) {
    stats_removed_elements = 0;
    stats_avg_retries = 0;
    stats_expired_elements = 0;
    stats_cascaded_elements = 0;
    stats_remove_retry = 0;
}

void print_stats(void) {
    printf("stats_removed_elements: %lld\n"
           "stats_expired_elements: %lld\n"
           "stats_cascaded_elements: %lld\n"
           "stats_remove_retry: %lld\n"
           "stats_avg_retries: %lld\n",
           stats_removed_elements,
           stats_expired_elements,
           stats_cascaded_elements,
           stats_remove_retry,
           stats_avg_retries);
}

/* --------------------- TimerWheel API --------------------- */

TimerWheel *CreateTimerWheel(GetExpiryFunc get_expiry, ExpireFunc expire) {
    TimerWheel *tw = malloc(sizeof(TimerWheel));
    memset(tw, 0, sizeof(*tw));
    tw->get_expiry = get_expiry;
    tw->expire = expire;
    return tw;
}

void FreeTimerWheel(TimerWheel *tw) {
    raxFreeWithCallback(tw->slots, (void (*)(void *))hashtableRelease);
    free(tw);
}

int AddElement(TimerWheel *tw, void *element) {
    if (!tw->slots || raxSize(tw->slots) == 0) {
        /* empty container lazy initialize it. */
        tw->last_iterated_time = align_slot_time(mstime_real(), 0);
        tw->slots = raxNew();
        // printf("AddElement: init timer wheel. time: %llu now: %llu\n", tw->last_iterated_time, now);
    }

    long long expiry = tw->get_expiry(element);
    assert(expiry >= tw->last_iterated_time);

    long long delta = expiry - tw->last_iterated_time;
    uint8_t level = choose_level(delta);
    long long slot_time = align_slot_time(expiry, level);

    char key[8];
    build_rax_key(slot_time, key);

    hashtable *ht;
    if (!raxFind(tw->slots, (unsigned char *)key, sizeof(key), (void **)&ht)) {
        ht = hashtableCreate(&pointerHashtableType);
        raxInsert(tw->slots, (unsigned char *)key, sizeof(key), ht, NULL);
    }
    assert(!tw->slots || raxSize(tw->slots) <= MAX_BUCKETS);
    return hashtableAdd(ht, element);
}

int RemoveElement(TimerWheel *tw, void *element) {
    long long expiry = tw->get_expiry(element);
    stats_removed_elements++;
    uint8_t level = choose_level(expiry - tw->last_iterated_time);
    int retries = 0;
    for (; level < LEVELS; level++, retries++) {
        long long slot_time = 0;
        slot_time = align_slot_time(expiry, level);

        char key[8];
        build_rax_key(slot_time, key);

        hashtable *ht = NULL;
        if (raxFind(tw->slots, (unsigned char *)key, sizeof(key), (void **)&ht)) {
            if (hashtableDelete(ht, element)) {
                if (hashtableSize(ht) == 0) {
                    raxRemove(tw->slots, (unsigned char *)key, sizeof(key), (void **)&ht);
                    hashtableRelease(ht);
                    // printf("RemoveElement: removing slot with time: %llu at level: %d\n", slot_time, level);
                }
                return 1;
            }
        }
        stats_avg_retries = stats_avg_retries + (retries - stats_avg_retries) / stats_removed_elements;
        stats_remove_retry += retries;
        // printf("RemoveElement: Failed to remove entry with expiry: %llu reached slot %llu tw time: %llu level: %d\n", expiry, slot_time, tw->last_iterated_time, level);
    }

    return 0;
}

/* --------------------- Iteration & Cascading --------------------- */

int TimerWheelStep(TimerWheel *tw, int max_steps) {
    if (!tw->slots) {
        return 0;
    }

    int steps = 0;
    long long now = mstime();

    printf("TimerWheelStep: before processing %d steps, now: %lld, buckets: %llu \n", max_steps, now, raxSize(tw->slots));

    raxIterator iter;
    raxStart(&iter, tw->slots);
    while (steps < max_steps) {
        if (raxSize(tw->slots) == 0)
            break;
        long long slot_time;
        raxSeek(&iter, "^", NULL, 0);
        raxNext(&iter);
        slot_time = read_rax_key(iter.key);

        // printf("TimerWheelStep: processing slot: %lld, buckets: %llu \n", slot_time, raxSize(tw->slots));

        /* in case we only have expired items located in the future lets bail out. */
        if (slot_time + get_slot_width(0) > now) {
            tw->last_iterated_time = align_slot_time(now, 0);
            // printf("TimerWheelStep: reached out of bound slot time: %lld left slots: %lld tw time: %llu\n", slot_time, raxSize(tw->slots), tw->last_iterated_time);
            break;
        } else {
            // printf("TimerWheelStep: tick timer wheel is currently at: %llu\n", tw->last_iterated_time);
            tw->last_iterated_time = slot_time;
        }
        hashtable *ht = iter.data;
        assert(ht);
        hashtableIterator it;
        hashtableInitIterator(&it, ht, HASHTABLE_ITER_SAFE);
        void *element;
        while (steps++ < max_steps && hashtableNext(&it, &element)) {
            long long expiry = tw->get_expiry(element);
            hashtablePop(ht, element, NULL);
            if (expiry <= now) {
                tw->expire(element);
                stats_expired_elements++;
            } else {
                assert(expiry > slot_time + get_slot_width(0));
                AddElement(tw, element);
                stats_cascaded_elements++;
            }
        }
        hashtableResetIterator(&it);

        // printf("TimerWheelStep: After processing slot: %lld, total: %zu expired: %llu \n", slot_time, hashtableSize(ht), expired);

        if (hashtableSize(ht) == 0) {
            // printf("TimerWheelStep: removing slot with time: %lld left slots: %lld\n", slot_time, raxSize(tw->slots));

            raxRemove(tw->slots, (unsigned char *)iter.key, sizeof(iter.key), (void **)&ht);
            hashtableRelease(ht);
            tw->last_iterated_time = slot_time;
        }
        // printf("TimerWheelStep: after processing %d steps of slot with time: %s left slots: %lld\n", steps, slot_time_str, raxSize(tw->slots));
    }
    raxStop(&iter);
    return steps;
}

When I PoC HTW against coarse bucket I actually got nice results even when the cascading rate was high.

Some points though:

  1. The cascading will usually happen on a specific cyclic point in time (like every full timer cycle). since we bound the iteration count, the impact on tail latency is low and we will also not expect this to happen on a constant rate.
  2. Another "down side" to this algorithm is that when we delete an item we might have to look it up in alll levels (we will first match the level matching the current key, but if it will not be found we will have to start look up. When I tested my PoC, it rarely happened though....
  3. generic keys does lazy expiration, which might also REDUCE the cascading impact as they might be deleted before cascading happens.

ranshid avatar Sep 19 '25 05:09 ranshid

@sarthakaggarwal97 @ranshid

Hello, I would like to work on this.

My plan is to add it as a new bucket type following the same pattern as the existing bucket types in vset, using the multi-level timer wheel approach with RAX storage as suggested. I'll implement the core functionality including function for incremental processing and cascading, along with the Binary Indexed Tree for efficient expired element counting. I will ensure that the implementation will integrate cleanly with the existing vset API without breaking changes, allowing the new bucket type to coexist with the current bucket types.

I'd like to start implementation after getting approval. I'll ensure to follow Valkey's coding standards and add appropriate tests.

SarthakRawat-1 avatar Nov 05 '25 17:11 SarthakRawat-1