FusionCache icon indicating copy to clipboard operation
FusionCache copied to clipboard

[FEATURE] Add GetOrSetAsyncEnumerable for Caching and Streaming IAsyncEnumerable<T>

Open mokarchi opened this issue 3 months ago • 6 comments

Description

I propose adding a new method to IFusionCache called GetOrSetAsyncEnumerable<T>. This method would accept a factory that returns an IAsyncEnumerable<T> (e.g., from a data source like a database query or API stream).

  • On the first execution (cache miss): It reads all items from the source, caches them in a list (or suitable collection), and then yields them back as a streaming IAsyncEnumerable<T>.
  • On subsequent executions (cache hit): It directly yields the cached list as a new IAsyncEnumerable<T> instance, allowing efficient streaming without reloading the data.

This would integrate with existing FusionCache features like timeouts, fail-safe, and distributed caching.

Use Cases and Benefits

  • Big Data Scenarios: Ideal for handling large datasets (e.g., real-time reporting, paginated queries) where loading everything into memory at once is inefficient. Streaming allows processing items as they come without blocking.
  • Real-Time Applications: Enhances FusionCache for modern .NET apps using async streams, like in ASP.NET Core endpoints or background services.
  • Potential Impact: High. Makes FusionCache more versatile and attractive for data-intensive apps, aligning with .NET's async enumerable support (introduced in C# 8.0). It could attract more users from big data or cloud-native communities.

Potential Implementation Sketch

Here's a rough API signature and pseudocode:

public interface IFusionCache
{
    // Existing methods...

    IAsyncEnumerable<T> GetOrSetAsyncEnumerable<T>(
        string key,
        Func<CancellationToken, IAsyncEnumerable<T>> factory,
        FusionCacheEntryOptions? options = null,
        CancellationToken token = default
    );
}

Implementation outline:

  • Check cache for the key.
  • If miss: Execute factory, collect items into a List<T> (or ImmutableList<T> for thread-safety), cache the list, then yield items one by one using await foreach in an async generator.
  • If hit: Retrieve the cached list and yield its items in a new async enumerable.
  • Handle concurrency (e.g., using a lock or cache stampede protection), cancellation, and memory efficiency to avoid OOM in large datasets.

Challenges:

  • Managing async/await in loops to prevent high memory usage.
  • Ensuring compatibility with backplane and multi-level caching.
  • Testing for performance in high-throughput scenarios.

Difficulty: Hard, due to async flow management, but feasible with .NET's built-in support.

Questions/Feedback

  • Does this align with the project's roadmap?
  • Any preferences on implementation details (e.g., collection type for caching)?
  • Should it support additional options, like partial streaming or lazy caching?

I'd be happy to implement this as a PR if the team is interested. Let me know your thoughts!

mokarchi avatar Oct 05 '25 06:10 mokarchi

@jodydonetti Could you review this, please?

mokarchi avatar Oct 06 '25 18:10 mokarchi

Hi @mokarchi , I'm currently off for my "summer" vacation, I'll be back next week.

jodydonetti avatar Oct 06 '25 19:10 jodydonetti

Hi @mokarchi , I'm currently off for my "summer" vacation, I'll be back next week.

Enjoy your summer vacation 😊

mokarchi avatar Oct 06 '25 19:10 mokarchi

Hi @mokarchi , I'm back and looking into this. Give me a moment to wrap my head around it and will get back with an answer later today. Thanks!

jodydonetti avatar Oct 18 '25 09:10 jodydonetti

Ok so, I thought about this a bit, and it feels quite complex.

I understand the desire of "streaming" the inner items of the list that then would be cached, but there is a lot to unpack here, bear with me.

Description

I propose adding a new method to IFusionCache called GetOrSetAsyncEnumerable<T>. This method would accept a factory that returns an IAsyncEnumerable<T> (e.g., from a data source like a database query or API stream).

  • On the first execution (cache miss): It reads all items from the source, caches them in a list (or suitable collection), and then yields them back as a streaming IAsyncEnumerable<T>.
  • On subsequent executions (cache hit): It directly yields the cached list as a new IAsyncEnumerable<T> instance, allowing efficient streaming without reloading the data.

This would integrate with existing FusionCache features like timeouts, fail-safe, and distributed caching.

What would be the expected behavior with Eager Refresh, Timeouts and the other features?

Use Cases and Benefits

  • Big Data Scenarios: Ideal for handling large datasets (e.g., real-time reporting, paginated queries) where loading everything into memory at once is inefficient. Streaming allows processing items as they come without blocking.

But then they would be cached as a single entry in the cache. In general it's not a good approach to cache large datasets as a single cache entry: this is true for L1-only scenarios, but becomes prohibitive when dealing with L1+L2 scenarios.

  • Potential Impact: High.

Agree 😅, it's early now and I haven't wrapped my head around it all yet, but I'm already seeing a lot of really complex edge cases to handle.

Makes FusionCache more versatile and attractive for data-intensive apps, aligning with .NET's async enumerable support (introduced in C# 8.0). It could attract more users from big data or cloud-native communities.

This would be interesting, but it's important to either do it right (considering all the edge cases, features, etc) or not do it at all. I'm saying this because giving a feature means both promising that it will work, and well, and suggesting a sort of "golden path" to follow. This hypothetical new feature requires careful consideration, testing, and benchmarking.

Potential Implementation Sketch

Here's a rough API signature and pseudocode:

public interface IFusionCache { // Existing methods...

IAsyncEnumerable<T> GetOrSetAsyncEnumerable<T>(
    string key,
    Func<CancellationToken, IAsyncEnumerable<T>> factory,
    FusionCacheEntryOptions? options = null,
    CancellationToken token = default
);

}

Implementation outline:

  • Check cache for the key.
  • If miss: Execute factory, collect items into a List<T> (or ImmutableList<T> for thread-safety)

ImmutableList<T> cannot be used to collect iteratively: since it's immutable, it can only be constructed at the end, when all the items are there. If thread-safety is a concern, maybe ConcurrentBag<T> would be better, but: by its own nature, that is unordered, so the order of the items in the list would not be maintained, which is problematic.

  • cache the list, then yield items one by one using await foreach in an async generator.

I'm not following: we would get all the items, cache the resulting list, and then await foreach at the end? In that case the list would be already materialized, and I don't see the value in then going IAsyncEnumerable<T>. Or you meant doing an await foreach directly on the factory result, with a yield inside the loop?

  • Handle concurrency (e.g., using a lock or cache stampede protection), cancellation, and memory efficiency to avoid OOM in large datasets.

If the dataset is large and the promise is to cache it, what would you propose? Throwing something after a certain threshold? Something else?

Challenges:

  • Managing async/await in loops to prevent high memory usage.
  • Ensuring compatibility with backplane

If at the end of the day we are talking about a single cache key, this would not be a problem.

and multi-level caching.

This instead would be, as mentioned above: large datasets in cache entries are discouraged, even more so with L2 (distributed caching).

Also: with an L1 cache miss but an L2 cache hit, what should happen? The only thing I can see is FusionCache getting the entire cache key from L2 as a single (very big) binary blob, deserializing it, and simulating an IAsyncEnumerable<T> over the materialized list.

  • Testing for performance in high-throughput scenarios.

Yup.

Difficulty: Hard, due to async flow management

Agree: due to that and supporting the other features and size limits, etc.

but feasible with .NET's built-in support.

Honestly I'm not so sure: it's probably "doable" in the sense that it would "technically work", but it would have a lot of things to look out for + the gains would probably not be what I think you are probably hoping for.

Mind you, maybe I'm wrong here, but as of now (with the limited time I had to explore it) that's what I'm currently thinking, so I wanted to share it with you.

Questions/Feedback

  • Does this align with the project's roadmap?

Not really, but I just added #543 which has come from a community request I've never even thought of. What I'm saying is that not being in the roadmap is not necessarily a show stopper.

I'd be happy to implement this as a PR if the team is interested. Let me know your thoughts!

The team is just me 😄 so I can answer directly: as mentioned above for now I can't see it working reasonably well, but maybe it's just me or the limited time available to think deeper about it. If you want to explore the idea with a potential implementation: absolutely! And when you feel like it, I can take a look at it and see how it went.

Hope this helps, let me know what you think.

jodydonetti avatar Oct 18 '25 17:10 jodydonetti

Thanks for the detailed feedback, @jodydonetti! I really appreciate you taking the time to dive into this and share your thoughts. I completely understand the complexity and edge cases you’ve pointed out, and I’d love to address them to see if we can refine the idea into something that aligns well with FusionCache’s goals. Below, I’ll respond to each point you raised and propose some potential solutions to explore further. Let me know if I’m on the right track or if there’s anything specific you’d like me to focus on!

What would be the expected behavior with Eager Refresh, Timeouts and the other features?

Here’s how I envision this working: Eager Refresh: On a cache hit, if Eager Refresh is enabled, the cached list could be returned immediately as an IAsyncEnumerable<T>, while the background refresh process triggers the factory to fetch a new set of items. Once the factory completes, the new list would replace the cached entry, just like the current GetOrSet behavior. To avoid disrupting the streaming, the refresh could happen asynchronously without blocking the yield. Timeouts: We could apply the timeout specified in FusionCacheEntryOptions to the factory execution during a cache miss. If the factory takes too long (e.g., a slow database query), the timeout would trigger, and we could fall back to a fail-safe mechanism (if enabled) or throw a timeout exception. Fail-Safe: If fail-safe is enabled, a stale cache entry could be returned as an IAsyncEnumerable<T> even if the factory fails or times out, ensuring the client still gets data. Backplane: The cached list would be serialized and sent to the backplane (e.g., Redis) on a cache miss, similar to how other cache entries are handled. On a cache hit from L2, we’d deserialize the list and yield it as an IAsyncEnumerable<T>.

But then they would be cached as a single entry in the cache. In general it's not a good approach to cache large datasets as a single cache entry: this is true for L1-only scenarios, but becomes prohibitive when dealing with L1+L2 scenarios.

we could explore a few strategies: Size Limits: Introduce an optional size limit (e.g., max items or max memory) in the FusionCacheEntryOptions. If the collected list exceeds this limit during a cache miss, we could either throw an exception or skip caching and just stream the factory output directly. This would give users control over memory usage. Chunked Caching: Instead of caching the entire list as a single entry, we could split the data into smaller chunks (e.g., based on a fixed number of items or memory size) and cache them under derived keys (e.g., key:chunk1, key:chunk2). On a cache hit, the method could yield items from each chunk in sequence, preserving the streaming behavior. This would reduce memory pressure and improve scalability for L2 caching. Lazy Caching Option: Add an option to cache items incrementally as they’re yielded from the factory. For example, cache every 1000 items as a chunk, allowing partial caching for very large datasets. This could work well for scenarios where the client might not consume the entire stream.

ImmutableList<T> cannot be used to collect iteratively: since it's immutable, it can only be constructed at the end, when all the items are there. If thread-safety is a concern, maybe ConcurrentBag<T> would be better, but: by its own nature, that is unordered, so the order of the items in the list would not be maintained, which is problematic.

Instead, I propose:

  • Using a plain List<T> for collecting items during a cache miss, with FusionCache’s existing stampede protection to handle concurrency (avoiding the need for thread-safe collections like ConcurrentBag<T>).
  • For thread-safety during caching, we could wrap the final List<T> in an ImmutableList<T> after collection is complete, before storing it in the cache. This ensures thread-safety for cache hits while preserving order.
  • Alternatively, if memory efficiency is a concern, we could explore a custom collection that supports appending items efficiently during collection but exposes a read-only view for streaming.

I'm not following: we would get all the items, cache the resulting list, and then await foreach at the end? In that case the list would be already materialized, and I don't see the value in then going IAsyncEnumerable<T>. Or you meant doing an await foreach directly on the factory result, with a yield inside the loop?

Sorry for the confusion in my explanation! Let me clarify the intent. The goal of returning an IAsyncEnumerable<T> (even for a cached list) is to maintain a consistent API and enable streaming-style consumption for clients, which is valuable in scenarios like:

ASP.NET Core streaming responses: Clients can process items as they’re yielded (e.g., in a gRPC or SignalR stream) without loading the entire list into memory at once. Partial consumption: Clients might only need the first few items and can cancel the enumeration early, avoiding unnecessary processing.

For the implementation:

Cache Miss: We’d use await foreach on the factory’s IAsyncEnumerable<T> to collect items into a List<T> for caching. Simultaneously, we’d yield return each item as it’s received, preserving the streaming behavior for the client. This avoids materializing the entire list in memory before yielding. Cache Hit: We’d iterate over the cached List<T> and yield return each item to simulate an IAsyncEnumerable<T>. This ensures the API feels consistent whether it’s a hit or miss.

To illustrate, here’s a refined pseudocode:

public async IAsyncEnumerable<T> GetOrSetAsyncEnumerable<T>(
    string key, Func<CancellationToken, IAsyncEnumerable<T>> factory, 
    FusionCacheEntryOptions? options = null, [EnumeratorCancellation] CancellationToken token = default)
{
    if (_cache.TryGetValue(key, out var cachedList) && cachedList is List<T> list)
    {
        foreach (var item in list)
            yield return item;
        yield break;
    }

    var newList = new List<T>();
    await foreach (var item in factory(token).WithCancellation(token))
    {
        newList.Add(item);
        yield return item; 
    }

    _cache.Set(key, newList, options);
}

If the dataset is large and the promise is to cache it, what would you propose? Throwing something after a certain threshold? Something else?

Here are a few ideas to mitigate this: Threshold-Based Limits: Add an option in FusionCacheEntryOptions for a max size (e.g., max items or max bytes). If the collected list exceeds this during a cache miss, we could:

  • Throw a FusionCacheSizeLimitException to alert the client.
  • Or skip caching entirely and just stream the factory output, letting the client decide how to handle it.

Chunked Streaming/Caching: As mentioned earlier, we could cache the data in smaller chunks (e.g., 1000 items per chunk). This would allow partial caching and reduce memory pressure. On a cache hit, we’d yield items from each chunk in sequence.

Also: with an L1 cache miss but an L2 cache hit, what should happen? The only thing I can see is FusionCache getting the entire cache key from L2 as a single (very big) binary blob, deserializing it, and simulating an IAsyncEnumerable<T> over the materialized list.

To address this: Chunked Caching: As proposed earlier, we could store the data in smaller chunks in L2 (e.g., key:chunk1, key:chunk2). On an L1 miss but L2 hit, we’d fetch and deserialize each chunk incrementally, yielding items as each chunk is processed. This reduces the memory footprint and allows streaming to start sooner. Metadata for Chunks: We could store metadata in L2 (e.g., number of chunks, total items) under a separate key (e.g., key:meta). This would help FusionCache reconstruct the IAsyncEnumerable<T> efficiently without needing to load everything at once.

I'd be happy to implement this as a PR if the team is interested. Let me know your thoughts!

I’m thrilled to hear that you’re open to exploring this idea, even if it’s not currently on the roadmap! Your encouragement means a lot, and I’d love to take a stab at implementing this as a PR to see how it plays out in practice

mokarchi avatar Oct 18 '25 18:10 mokarchi