spring-framework icon indicating copy to clipboard operation
spring-framework copied to clipboard

Reactive types support for @Cacheable methods [SPR-14235]

Open spring-projects-issues opened this issue 8 years ago • 27 comments

Pablo Díaz-López opened SPR-14235 and commented

Currently when using cache annotations on beans it caches the Observables like other types, So it will not cache its value.

I tried to use the following pattern to handle it:

@Cacheable("items")
public Observable<Item> getItem(Long id) {
    return Observable.just(id)
        .map(myrepo::getById)
        .cache();
}

In the happy path, as we cache the Observable values works pretty well, but if getById throws an exception the observable is cached with the exception which isn't how it should work.

It would be also very nice to have support to Single.

If you give me some advice I can try to do a PR to solve this.


Affects: 4.2.5

Sub-tasks:

  • #22152 a

Referenced from: pull request https://github.com/spring-projects/spring-framework/pull/1066

1 votes, 8 watchers

spring-projects-issues avatar Apr 29 '16 16:04 spring-projects-issues

Hi, what is the status of this ? If I understand well, there was discussion about a ReactiveCache since JSR-107 is a blocking API. Has something evolved since the opening of this issue ?

cbornet avatar Jul 01 '19 15:07 cbornet

Nothing has been evolving in that area I am afraid and given the lack of proprietary implementations, I think cache vendors themselves aren't keen to explore that route either. This leaves us with the blocking model which isn't a great fit with whatever @Cacheable has to offer.

There is a cache operator in reactor though and I remember so initial work trying to harmonize this here. Perhaps @simonbasle can refresh my memory?

snicoll avatar Jul 09 '19 11:07 snicoll

I made an attempt at an opinionated API towards caching in reactor-extra (see reactor-addons repo), but that has not seen much focus nor feedback since then, so I wouldn't claim it is perfect.

simonbasle avatar Jul 10 '19 08:07 simonbasle

Are there any updates on this?

tomasulo avatar May 08 '20 14:05 tomasulo

No, the latest update on is here. Most cache libraries (including JCache) are still on a blocking model so my comment above still stands.

snicoll avatar May 10 '20 08:05 snicoll

Are there any updates on this?

k631583871 avatar Mar 15 '21 02:03 k631583871

@k631583871 I already replied to that question just above your comment.

snicoll avatar Mar 15 '21 07:03 snicoll

Redis has reactive driver. So it can be implemented for redis

ankurpathak avatar Jul 09 '21 05:07 ankurpathak

I made new project for reactive cache with proper annotation usage, Spring Boot auto-configuration and tests, looks like working well, it will be soon deployed to our production.

Bryksin avatar Nov 05 '21 17:11 Bryksin

Folks, is there a technical reason @Cacheable is not implemented with Reactor? I've implemented cache with Caffeine AsyncCache, as a AOP MethodInterceptor, but one of my colleagues made a comment that we have to be careful with the interceptor approach since this is how the normal @Cacheable is implemented, so if it were so trivial why didn't Spring add this support to be used with Reactor. Is there a performance penalty somehow?

We can do something similar with a custom operator for the flow, but I really like the AOP interceptor approach - very clean and gives me access to the method's attributes in bulk.

bezrukavyy avatar Dec 21 '21 20:12 bezrukavyy

Developers, myself included, have been waiting for Spring to support the reactive types in the Cache Annotation implementation, also known as Cache Abstraction. What is interesting in this story, Micronaut, a relative new framework compared to Spring, added support for reactives types in their Cache Annotation implementation since their 1.0.0 GA Release back in October 2018. You can check it by yourself:

Not related to this topic, but another area where Micronaut has been more successful than Spring is supporting reactive types through their declarative HTTP Client. Spring Cloud Open Feign doesn't support reactive types and in the documentation they explicitly say they won't until OpenFeign supports Project Reactor. Interesting, Micronaut's declarative HTTP Client supports any type that implements the org.reactivestreams.Publisher interface.

BTW, I'm a huge Spring Framework fan, I've been coding Spring based applications since 2005 and for the last 5 years doing Spring Cloud microservices, but my dear Spring folks, in my honest opinion it is time to catch up!

howardem avatar Dec 23 '21 17:12 howardem

@howardem yeah, Micronaut does have an AsyncCache API, which might in some case use the backing cache provider's own async methods, or put some async lipstick on a blocking-only API (by running things in dedicated threads)....

One thing to consider is that Micronaut's Cacheable interceptor needs to work with the common denominator of cache providers it claim to support. As a result, caching the result of an reactive-returning method boils down to these steps:

  1. get from cache as Mono<Optional<T>>
  2. flatMap on the optional
  3. if valued, just unwrap the Optional to a T. DONE
  4. if empty, convert the original method return Publisher to a Mono
  5. flatMap the result of that Publisher and call asyncCache.put with it
  6. (if said publisher instead completes empty, continue the sequence with asyncCache.invalidate instead)

One glaring limitation is that it is not "atomic". It is comparable to calling a ConcurentMap (containsKey(k)) ? get(k) : put(k, v) instead of the atomic putIfAbsent(k, v).

The thing is, caching atomically is more important in the case of an asynchronous method:

  • The fact that the original method is asynchronous brings the possibility of additional latency when "creating" the value to be put in the cache in case of Cache Miss.
  • This opens up a larger window for problems like Cache Stampede to occur: if multiple clients call the endpoint at roughly the same time, it is likely they will all see a Cache Miss and the interceptor will trigger the original method multiple times.

Micronaut's AsyncCache interface might have tried to introduce some remediation in the form of get(K, Supplier<V>), but that Supplier is still a blocking thing. Again, this seems to be for lowest common denominator reasons, as only Caffeine has a truly async API including a get(K, Function<K, CompletableFuture<V>>) (it is actually a BiFunction, but you get the idea).

Note that Micronaut's own Cacheable interceptor doesn't even use that get(K, Supplier<V>) method from AsyncCache... For caching reactive types, I don't think it actually even try to support the atomic parameter to Cacheable at all.

So yeah, it has an async cache abstraction. It is best effort, and might hide subtle issues like cache stampeding, so I would take it with a grain of salt.

simonbasle avatar Jan 06 '22 11:01 simonbasle

fwiw, I believe Quarkus implements its caches by requiring that they are asynchronous. Their interceptor checks if the return type is reactive, and if not simply block indefinitely (unless a timeout is specified). Internally they use a Caffeine AsyncCache, saving a thread by performing the work on the caller by manually inserting and completing the future value. I'm not sure if they support any provider other than Caffeine, so that makes it much easier as they could require asynchronous implementations and emulate a synchronous api by blocking. Maybe Spring Cache could iterate towards something similar (assuming some caveats for other providers can be made acceptable)?

ben-manes avatar Jan 10 '22 02:01 ben-manes

yeah, that's interesting. they initially (very early) thought about supporting multiple caching providers but the reduction in scope (annotations only at first) and the focus on the 80% case led them to only use Caffeine as the underlying implementation 👍 they don't seem to use your get(K key, BiFunction<K, ExecutorService, CompletableFuture> loader) variant @ben-manes, but instead rely on putIfAbsent and computeIfPresent.

but yeah, they focused on an async API returning Uni reactive types (which can very easily be presented as CompletableFutures to Caffeine).

simonbasle avatar Jan 10 '22 09:01 simonbasle

I think that I suggested the putIfAbsent approach so that they could reuse the caller thread. There were many PRs as they iterated and I advised from a point of ignorance (as I use Guice at work, no annotated caching, and it’s all a weekend hobby). I think going async was an evolution of their internal apis, but my knowledge is very fuzzy. I hope that other caching providers become async friendly to enable similar integrations, but I don’t know the state of that support.

ben-manes avatar Jan 10 '22 10:01 ben-manes

hy @Bryksin how has it been working for you? we also currently use Redis-backed cache (via redisson), but we have to implement it in the code level, along the lines of

getFromCache(key).
.switchIfEmpty(getFromUpstream().delayUntil(value -> putToCache(key, value))

which I don't like too much, as caching should be treated a cross-cutting concern, easily disable-able if needed

I like the API of your library (https://github.com/Bryksin/redis-reactive-cache) but my coworkers are hesitant given apparent lack of community adoption :(

also would be interesting to learn of @simonbasle take on this approach 🙏

62mkv avatar Mar 15 '22 10:03 62mkv

Hi @62mkv, unfortunately usage of the lib is very low, no time to focus on it properly It was just MVP version and definitely has room for improvement, at least:

  • Config property to enable/disable
  • Exceptions and error handling (for example connection to redis failed, it should continue with method execution) So yes, there is definitely room for improvement but no time, and community didn't pick it up. Though I would prefer Spring boot official solution rather then custom stuff

Though time is going but such important aspect as Cache continue to be ignored by Spring for reactive stuff by some reason...

Bryksin avatar Mar 15 '22 11:03 Bryksin

@Bryksin with regards to "ignoring for some reason", this comment above yours should help nuance that hopefully.

snicoll avatar Mar 15 '22 13:03 snicoll

This seems like it might be why the following doesn't work for me in a WebFlux app:

@Cacheable(cacheNames="users", key="#token")
public Mono<Jwt> enrich(String token, Jwt jwt) { } 

It still seems like this method is hit every time, even though the token value is the same.

Code and comment from https://github.com/jhipster/generator-jhipster/pull/18241#issuecomment-1127083403.

Is the recommendation that we don't use Cacheable in WebFlux apps for things like this?

mraible avatar Jun 07 '22 17:06 mraible

I am not sure what you're asking. Yes, it is expected that this doesn't work on reactive types.

snicoll avatar Jun 07 '22 17:06 snicoll

Is there any plan to fix the above issue?

Note: Caffeine AsyncCache can help. But we need to write code manually instead of @cacheable.

ManikantaGembali avatar Jun 28 '22 14:06 ManikantaGembali

As far as I understand, here's the status of this issue: @Cacheable is meant as an abstraction model for many caching libraries and APIs, letting you use a common programming model on top of many implementations. In the case of reactive, few vendors support the async model right now so any short term solution we could adopt might not reflect the approach and API semantics chosen by the majority in the future.

In short, if you'd like this issue to make progress, you should reach out to your favorite cache library vendor, ask for async/reactive support, and work with them on the use case.

bclozel avatar Jun 28 '22 16:06 bclozel

I agree with bclozel's comment. I'm currently in a situation where I need a caching feature within a reactive application implemented via webflux. So I tried to solve this problem with future-based asynchronous APIs Caffeine and Mono.toFuture/Mono.fromFuture(reactor/reactor-addons#237), and implemented it using spring-aop and custom cache manager&annotation like this :

/**
I created AsyncCacheManager and @AsyncCacheable to use instead of CacheManager and @Cacheable functions of spring-cache.
Create AsyncCache for the method declared @AsyncCacheable in BeanPostProcessor, and then use it by getting it from AsyncCacheManager.
AsyncCacheManager is implemented as ConcurrentHashMap<String, AsyncCache>.
When calling asyncCache.get(), put Mono.defer(method).toFuture() in the mapping function. And return via Mono.fromFuture.
*/

@Around("pointcut()")
public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
	...
	AsyncCache asyncCache = asyncCacheManager.get(cacheName);
	if (Objects.isNull(asyncCache)) {
		return joinPoint.proceed();
	}

	//Return type : Mono
	Mono retVal = Mono.defer(() -> {
		try {
			return (Mono) joinPoint.proceed();
		} catch (Throwable th) {
			//error handling
		}
	});

	CompletableFuture completableFuture = asyncCache.get(generateKey(args), (key, exec) -> (retVal).toFuture());
	return Objects.nonNull(completableFuture) ? Mono.fromFuture(completableFuture) : retVal;
}

Before visiting this issue, I got help from ben-manes of the Caffeine Project. (https://github.com/ben-manes/caffeine/discussions/500) Finally, I checked the blocked section using Reactor/BlockHound (https://github.com/reactor/BlockHound) and made sure it worked as intended.

Are there any expected problems with how to use it this way? The reason I am writing this is to share it with people who are experiencing the same problem. If you have a better way, please share.

jaesuk-kim0808 avatar Sep 27 '22 12:09 jaesuk-kim0808

HI, @jaesuk-kim0808 Your solution is very interesting. I have this same problem. I want your solution code. please share with me.

smilejh avatar Oct 07 '22 07:10 smilejh

Hi. I share the code I use. Although it has not yet been used in a real product, performance and functionality will be verified through load tests according to various scenarios in the near future. Hope this helps. Please let me know if there is a problem.

@RequiredArgsConstructor
@Aspect
@Component
public class AsyncCacheAspect {

    private final AsyncCacheManager asyncCacheManager;

    @Pointcut("@annotation(AsyncCacheable)")
    public void pointcut() {
    }

    @Around("pointcut()")
    public Object around(ProceedingJoinPoint joinPoint) throws Throwable {
        MethodSignature signature = (MethodSignature) joinPoint.getSignature();
        Method method = signature.getMethod();

        ParameterizedType parameterizedType = (ParameterizedType) method.getGenericReturnType();
        Type rawType = parameterizedType.getRawType();

        if (!rawType.equals(Mono.class) && !rawType.equals(Flux.class)) {
            throw new IllegalArgumentException("The return type is not Mono/Flux. Use Mono/Flux for return type. method: " + method.getName());
        }

        AsyncCacheable asyncCacheable = method.getAnnotation(AsyncCacheable.class);
        String cacheName = asyncCacheable.name();
        Object[] args = joinPoint.getArgs();

        AsyncCache asyncCache = asyncCacheManager.get(cacheName);
        if (Objects.isNull(asyncCache)) {
            return joinPoint.proceed();
        }

        //Return type : Mono
        if (rawType.equals(Mono.class)) {
            Mono retVal = Mono.defer(() -> {
                try {
                    return (Mono) joinPoint.proceed();
                } catch (Throwable th) {
                    throw new BusinessException(ResultCode.UNKNOWN_ERROR, th.getMessage());
                }
            });

            CompletableFuture completableFuture = asyncCache.get(generateKey(args), (key, exec) -> (retVal).toFuture());
            return  Mono.fromFuture(completableFuture);
        }

        //Return type : Flux
        Mono retVal = Mono.from(Flux.defer(() -> {
            try {
                return  ((Flux) joinPoint.proceed()).collectList();
            } catch (Throwable th) {
                throw new BusinessException(ResultCode.UNKNOWN_ERROR, th.getMessage());
            }
        }));

        CompletableFuture<List> completableFuture = asyncCache.get(generateKey(args), (key, exec) -> (retVal).toFuture());
        return Flux.from(Mono.fromFuture(completableFuture)).flatMap(x -> Flux.fromIterable(x));
    }

    private String generateKey(Object... objects) {
        return Arrays.stream(objects)
            .map(obj -> obj == null ? "" : obj.toString())
            .collect(Collectors.joining("#"));
    }
}
@RequiredArgsConstructor
@Component
public class AsyncCacheableMethodProcessor implements BeanPostProcessor {

    private final AsyncCacheManager asyncCacheManager;

    @Override
    public Object postProcessBeforeInitialization(Object bean, String beanName) throws BeansException {

        Arrays.stream(bean.getClass().getDeclaredMethods())
            .filter(m -> m.isAnnotationPresent(AsyncCacheable.class))
            .forEach(m -> {
                AsyncCacheable asyncCacheable = m.getAnnotation(AsyncCacheable.class);
                String cacheName = asyncCacheable.name();
                CacheType cacheType = CacheType.nameOf(cacheName);
                if (Objects.nonNull(cacheType)) {
                    asyncCacheManager.computeIfAbsent(cacheName, (key) ->  {
                        return Caffeine.newBuilder()
                            .maximumSize(cacheType.getMaximumSize())
                            .expireAfterWrite(cacheType.getExpiredAfterWrite(), TimeUnit.SECONDS)
                            .buildAsync();
                    });
                }
            });

        return bean;
    }

}
//This code is in Data access layer.
@AsyncCacheable(name = "getBySvcId")
@Override
public Mono<Domain> getBySvcId(String svcId) {}

AsyncCacheManager is a spring-bean with ConcurrentHashMap<String, AsyncCache>. CaheType is an enum that defines the key to be stored in AsyncCacheManager and 'maximumSize, expireAfterWrite' required to create Caffeine's AsyncCache.

jaesuk-kim0808 avatar Oct 11 '22 12:10 jaesuk-kim0808

Thank you. This is very helped me.

smilejh avatar Oct 17 '22 06:10 smilejh

Hi, @jaesuk-kim0808 I'm integrating your code but the AOP is not invoked...

//This code is in Data access layer.
@AsyncCacheable(name = "getBySvcId")
@Override
public Mono<Domain> getBySvcId(String svcId) {}

Can I put the method to cached in a service implementation as a private method? For example:

@AsyncCacheable(name = "getCurrencies")
private Mono<List<RateDto>> getCurrencies(String source, String target) {
  return currencyRatesApi.getCurrencyRates()
      .collectList();
}

The parameters aren't used in the code because I need to cache all elements. I need to change this code, but the pointcut is not launched I try to change it to public but the AOP is not invoked...

Thanks for your great work

kidhack83 avatar Oct 26 '22 10:10 kidhack83

Hi @kidhack83 As far as i know Spring AOP advices rely on proxies, which means they do not fire when the methods are being called from within the same instance. Try to adjust your code accordingly:

@Component
public class ServiceA {
  ...
  @AsyncCacheable(name = "getCurrencies")
  public Mono<List<RateDto>> getCurrencies(String source, String target) {
    return currencyRatesApi.getCurrencyRates()
        .collectList();
  }
  ...
}

@Component
@RequiredArgsConstructor
public class ServiceB {
  private final ServiceA serviceA;
  ...
  private Mono<List<RateDto>> someMethod(String source, String target) {
     return serviceA.getCurrencies(source, target);
  }
  ...
}

I hope this helps.

vgaborabs avatar Nov 20 '22 19:11 vgaborabs

I took the solutions in this thread and made a more fleshed out version that includes the annotation, the imports, and the dependencies: https://github.com/shaikezr/async-cacheable

shaikezr avatar Mar 28 '23 14:03 shaikezr

While we still do not see @Cacheable and co as a perfect companion for reactive service methods, we come back to this now from a pragmatic perspective: It is quite straightforward to support @Cacheable for CompletableFuture-returning methods, and with reasonable adaptation effort, the same approach can work for single-value reactive types such as Mono. With Flux, annotation-driven caching is never going to be an ideal fit, but when asked for it through a corresponding declaration, the best we can do is to collect the Flux outcome into a List and cache that, and then restore a Flux from there in case of a cache hit.

On the provider side, we got Caffeine's CompletableFuture-based AsyncCache which is a fine fit with the approach above. This allows for individual cache annotation processing as well as an @Cacheable(sync=true) style synchronized approach, with very similar semantics as with cache annotation usage on imperative methods. All it takes SPI-wise for that to be possible is two new methods on Spring's Cache interface: CompletableFuture retrieve(key) for plain retrieval and CompletableFuture retrieve(key, Supplier<CompletableFuture>) for sync-style retrieval. Those are designed for an efficient binding in CaffeineCache and can also be implemented in ConcurrentMapCache with some CompletableFuture.supplyAsync usage.

There are a few caveats: In order to address the risk of cache stampede for expensive operations, @Cacheable(sync=true) needs to be declared (just like with our imperative caching arrangement), with only one such caching annotation allowed on a given method then. Also, we expect Cache.evict, Cache.clear and also Cache.put to be effectively non-blocking for individual usage underneath @CacheEvict and @CachePut. This is the case with our common cache providers unless configured for immediate write-through which none of our retrieve-supporting providers have as a feature anyway. So in summary, for providers supporting our retrieve operations, evict/clear/put need to be effectively non-blocking in their implementration. This matches the existing semantic definition of those methods where they allow for asynchronous or deferred backend execution already, e.g. in transactional scenarios, with callers not expecting to see immediate effect when querying the cache right afterwards. The javadoc of those methods hints at the non-blocking requirement for reactive interactions as well now.

Thanks everyone for your comments in this thread! This served as very valuable inspiration.

jhoeller avatar Jul 21 '23 17:07 jhoeller