caffeine icon indicating copy to clipboard operation
caffeine copied to clipboard

Bulk refresh

Open ben-manes opened this issue 9 years ago • 22 comments

This Guava issue identified an expected optimization not being implemented. A getAll where some of the entries should be refreshed due to refreshAfterWrite schedules each key as an independent asynchronous operation. Due to the provided CacheLoader supporting bulk loads, it is reasonable to expect that the refresh is performed as a single batch operation.

This optimization may be invasive and deals with complex interactions for both the synchronous and asynchronous cache implementations. Or, it could be as simple as using getAllPresent and enhancing it to support batch read post-processing.

ben-manes avatar Feb 27 '15 22:02 ben-manes

The API for this improvement is not very clear. Even if a batch refresh may predominantly delegate to loadAll(keys), the method should try to be symmetric with the reload(key, @Nonnull value). What would the batch version of that be?

A batch refresh using refreshAfterWrite and triggered by getAll would maps directly to reloadAll(Map<K, V> entries). However, LoadingCache.refresh(key) will reload if the entry is present and load if it is not. How should refreshAll(keys) behave when some of the entries are present and some are not? Should it split the operation into two calls, reloadAll(present) and loadAll(missing), or should it delegate the decision to the user, e.g. reloadAll(Map<K, V> entries, Iterable<K> remainingKeys)? The latter seems wrong from an API consistency perspective, but correct from a performance one.

I'm favoring the reloadAll(Map) approach because it is more consistent and we can optimize the majority of the use-cases. Here's my rational.

  • A partially present batch is not the common case (explicit refresh). Most usages will be triggered by using refreshAfterWrite and a batch getAll.
  • Most users do not use the old values when computing the new one, relying on the default implementation (delegate to load).
  • Multiple operation are only required if the user overrides the default implementation (delegate to loadAll). This can be detected using reflection (at cache construction), allowing us to perform a single call instead of two since that will have the same user visible behavior.
  • A refresh is performed asynchronously, so 2 is a better worse case than N.
  • Like getAll, we cannot ensure that parallel batch operations do not overlap and may load the same entries. In the common case of refreshAfterWrite this can be optimized to not occur, though.

On reflection, I like CacheLoader.reloadAll(Map) as providing the best power-to-weight ratio.

@abatkin @cjohnson2

ben-manes avatar Jan 19 '16 06:01 ben-manes

To clarify: Would the solution reloadAll(Map) imply that refreshAll(keys) (where some are present and some are not) would end up with calls to both reloadAll(Map) and loadAll(keys) except where both are using the default implementation?

Or are you saying that the default implementation (in the interface) of reloadAll() will actually call loadAll() (even though the user may have provided their own implementation of loadAll()) and in that case you optimize to call loadAll() once? This sounds pretty slick and I'm imagining that it would be better than the extra Iterable - not only is it cleaner, but most people would probably end up pulling out all of the keys from the Map in reloadAll(Map,Iterable) and then adding all of the remainingKeys anyway, which is a ton of extra/unnecessary work.

abatkin avatar Jan 21 '16 01:01 abatkin

The default interface implementation of reloadAll(Map) would be to throw an UnsupportedOperationException. That would make it symmetric with loadAll where the "exception" is to indicate to fallback to the iterative loop. But similarly we don't actually have to throw and catch an exception, since by reflection we know ahead of time.

I think its best to layout each scenario separately based on which methods are implemented.

load loadAll reload reloadAll description
X ? N calls (L + R)
X X 1 call directly to loadAll
X X X 1 call to loadAll, R calls to reload
X X ? X R = 1: 1 call to loadAll, 1 call to reload
X X ? X R > 1: 1 call to loadAll, 1 call to reloadAll
X ? X R = 1: L calls to load, 1 call to reload
X ? X R > 1: L calls to load, 1 call to reloadAll

The assumption is that most users only ever implement load and maybe loadAll. Very few will ever want to do anything smart with the old value. Those who want to batch reloadAll should trust us to do it for them and not need to delegate it themselves.

Does that sound right?

ben-manes avatar Jan 21 '16 03:01 ben-manes

The table looks entirely reasonable.

The only thing of note is that existing code that has implemented loadAll() could suddenly experience a change in behavior where loadAll() is now called to reload items (instead of n calls to reload(). I'm guessing that in most cases this is fine (and may even be a pleasant surprise, since they wouldn't have implemented loadAll() if they didn't care).

abatkin avatar Jan 21 '16 03:01 abatkin

Only if reload was not implemented, which would have delegated to n calls to load. So they should be pleased because we make a single call to loadAll, which is what @cjohnson2 had expected in his Guava ticket.

ben-manes avatar Jan 21 '16 03:01 ben-manes

I have run into this issue too. Since the overhead for loading a single item is huge I just use an Async Loader and queue the reload requests until I have enough to run a batch. Not the cleanest solution but a workable one.

Dirk-c-Walter avatar Apr 06 '16 20:04 Dirk-c-Walter

@ben-manes i'm not sure if i'm understanding this feature or if there's a real issue. I'm using the loadAll() feature, and i'm using the scaffeine wrapper (which i'm sure you're familar with) which doesn't muck around with the loadAll feature (as shown here)

here's my simple code to demostrate this:

val cache = Scaffeine()
     .refreshAfterWrite(100 millis)
     .build[String, String](loader = (key: String) => {
       println(s"key: ${key}")
       key
     }, allLoader = Some((keys: Iterable[String]) => {
       println(s"keys: ${keys.mkString(",")}")
       keys.map(k => k -> k).toMap
     }))

 val values = cache.getAll(Seq("1", "2", "3")).toList

 Thread.sleep(200)

 cache.getAll(Seq("1", "2", "3")).toList    

normally since i have expireAfterWrite() and not explicitly calling refresh, i would expect the loadAll() to be called twice, but that's not the case...instead, the load() 3 times resulting in:

keys: 1,2,3
key: 1
key: 2
key: 3

replacing it with expireAfterWrite() behaves as expected, though...but with the obvious penalty of blocking.

thoughts?

dalegaspi avatar Mar 13 '18 12:03 dalegaspi

Currently refreshing is performed on individual keys and calls CacheLoader.reload(key, oldValue). Ideally it would take advantage of your bulk loader, but neither Caffeine nor Guava do this. We would need to add reloadAll to delegate to and the intelligent handling to support this scenario.

ben-manes avatar Mar 13 '18 14:03 ben-manes

Ah...thanks for the quick answer @ben-manes, i shouldn't have assumed that this is implemented since the ticket is still open 😊. I guess for now i'm gonna see what i can do with what we have. @Dirk-c-Walter's solution is interesting but it wouldn't work for me.

dalegaspi avatar Mar 13 '18 18:03 dalegaspi

Just posting this here for reference. The issue I described in https://github.com/ben-manes/caffeine/issues/323#issuecomment-924620934 and https://github.com/ben-manes/caffeine/issues/323#issuecomment-924664002 would most likely not have occurred at all with bulk refresh, because we implement loadAll for most of the caches and support bulk load all the way down to Redis. So in that sense this would be a very nice addition :+1:

slovdahl avatar Sep 24 '21 05:09 slovdahl

There is a CoalescingBulkLoader in the example section that joins multiple load requests into a bulk request by issuing a delay. When reviewing, I think I spotted at least one concurrency issue, so I would not recommend using it without prior vetting.

In cache2k I recently added bulk support and also provided a CoalescingBulkLoader. See the issue comment on how to configure it. I did some heavy concurrent testing on it, so I am quite confident it is production ready. The coalescing can happen for every load, which would always introduce a delay and therefor additional latency on user requests, or it can work within the refresh path only, so initial user requests are not delayed. The solution in Caffeine could be similar.

I was considering a bulk support in the cache core, which would mean that timer events would be managed for a bunch of keys, however, timing is a cross cutting concern through the whole cache implementation, OTOH "bulk" operations are very useful and widespread but not a common mode of operation. Adding bulk support in the timing code of the core would add a lot complexity. Also the majority of the cache API is still on individual keys, so entries initially loaded in a bulk request, might not be refreshed together. The coalescing approach allows to add it as an extension without altering the core cache.

At the moment the implementation uses a separate thread scheduler, which is not optimal as a general solution. It should be enhanced and use a timer from the the cache infrastructure without needing an additional thread.

Since coalescing introduces a tiny delay to bundle the requests, a more efficient solution could adjust the timer events by "rounding up" and the timer code could be extended to process bulks of events that happen at the same time.

cruftex avatar Nov 01 '21 11:11 cruftex

@cruftex wow i did not even think this exists...but would you mind pointing out the concurrency issues (at least a one or two of them)?

dalegaspi avatar Nov 01 '21 12:11 dalegaspi

...but would you mind pointing out the concurrency issues (at least a one or two of them)?

@dalegaspi here you are:

asyncLoad is doing:

....
        if (size.incrementAndGet() >= maxLoadSize) {
            doLoad();
        } else if (schedule == null || schedule.isDone()) {
            startWaiting();
        }
....

startWaitng() is doing:

    synchronized private void startWaiting() {
        schedule = timer.schedule(this::doLoad, maxDelay, MILLISECONDS);
    }

This has the chance that multiple timers are scheduled. Probably that causes no serious harm, but smaller bulk batches when there is contention. However, the overall design implies that there is only one timer scheduled, which is not guaranteed.

OTOH it will also start multiple doLoad once the threshold is reached.

Looking at the fields, for example:

 private volatile Queue<WaitingKey> waitingKeys = new ConcurrentLinkedQueue<>();

volatile seems odd, the field can be final.

Looking at doLoad it polls all queued elements and issues a bulk load, not respecting the max parameter.

That said I was happy for the idea, and I think the code is basically working. However, it needs a proper review. I am very suspicious when concurrent code is not using final and volatile in a sensible way.

cruftex avatar Nov 01 '21 14:11 cruftex

@cruftex thanks for pointing those out...now, I'm not going to pretend i fully understand this code...i'm sure @sheepdreamofandroids and/or @ben-manes can explain better since ~he~ @sheepdreamofandroids wrote it...but i think this code:

synchronized private void startWaiting() {
  schedule = timer.schedule(this::doLoad, maxDelay, MILLISECONDS);
}

will just replace the previous schedule in the case of a multiple access to the method (whoever is the last thread to access the method), which I believe should not cause any harm at all...just essentially resets the timer which i think should be ok.

as for the use of volatile...it's probably for safety to ensure that the queue is never stored in CPU cache...but to your point it's probably unnecessary and could be final.

as for the last point, i thought it did honor the max parameter in line 167 but maybe you are referring to an entirely different thing that I don't quite see.

kindly take these comments for what it's worth. i'm not starting a debate. i'm just a village idiot. 😊

Yes, I agree, this is a great idea and should be sufficient for most use cases. I will actually try this when i get the chance.

ninja edit: @Stephan202 pointed out that @sheepdreamofandroids wrote the class.

dalegaspi avatar Nov 01 '21 14:11 dalegaspi

I'm sure @ben-manes can explain better since he wrote it

I'm sure he can explain it (:smile:), but he's not the author; @sheepdreamofandroids is. See #336.

Stephan202 avatar Nov 01 '21 14:11 Stephan202

@Stephan202 thanks wow that was an oversight. i updated my previous comment.

dalegaspi avatar Nov 01 '21 14:11 dalegaspi

Wow, this is a while ago :-)

Indeed waitingKeys should be final, not volatile, that's just better.

I'm pretty sure that the schedule.cancel() on line 164 should instead be before line 160 to ensure that only one doLoad() is scheduled at a time, even though extra doLoad() invocations will simply not find anything to load and do no harm. I'm not sure what I was smoking when that happened.

doLoad() does respect the maxLoadSize though. If there are more keys to load, the bulkloader will be called more than once from doLoad() .

sheepdreamofandroids avatar Nov 01 '21 16:11 sheepdreamofandroids

@sheepdreamofandroids:

doLoad() does respect the maxLoadSize though. If there are more keys to load, the bulkloader will be called more than once from doLoad() .

Yes it does! Sorry. I missed the counting down logic.

cruftex avatar Nov 01 '21 16:11 cruftex

Hello. Is bulk refresh supported now?

VegetaPn avatar Nov 25 '21 05:11 VegetaPn

Not yet, it's been in the backlog with a contributed alternatives in the examples section.

ben-manes avatar Nov 25 '21 05:11 ben-manes

I'm sorry to ask question here. I override the loadAll(keys) method, which leads to getAll(keys) to query ES. My question is, given bulkLoad(keysToLoad, result, mappingFunction) in LocalManualCache.getAll() in high concurrency situation, if there is a way to ensure one key only query one time like the way in refresh.

Hailprob avatar Mar 14 '22 14:03 Hailprob

@Hailprob yes, if you use an AsyncCache (see faq for details).

ben-manes avatar Mar 14 '22 15:03 ben-manes

This issue will be closed as won't do after the documentation is updated to guide users.

The original ask was an observation that since getAll will perform the initial load using loadAll, it should likewise try to batch the reloads into a single request. The initial loads make sense to batch explicitly since the caller is waiting for the results. The reloads are optimistic, asynchronous, and hidden from the caller. If implemented as originally asked for then it would only reduce some of the parallel reloads and offer modest batching.

For bulk refreshes the coalescing examples show a better approach because they can capture reloads triggered by independent cache operations, buffer by a max delay or key count, and throttle the total number reloads in-flight. This can all be done by a more intelligent CacheLoader and no changes to the library are needed.

@sheepdreamofandroids' example is nice and straightforward. It can be further simplified by leveraging a reactive library, such as by using RxJava's buffer(timespan, timeUnit, count) or Reactor's bufferTimeout(maxSize, maxTime). It is then only ~30 LOC to implement and customize to fit the problem at hand.

Here is a short example using Reactor demonstrating this,

CoalescingBulkLoader
public final class CoalescingBulkLoader<K, V> implements AsyncCacheLoader<K, V> {
  private final Function<Set<K>, Map<K, V>> mappingFunction;
  private final Sinks.Many<Request<K, V>> sink;

  public CoalescingBulkLoader(int maxSize, Duration maxTime,
      int parallelism, Function<Set<K>, Map<K, V>> mappingFunction) {
    this.mappingFunction = requireNonNull(mappingFunction);
    sink = Sinks.many().unicast().onBackpressureBuffer();
    sink.asFlux()
        .bufferTimeout(maxSize, maxTime)
        .parallel(parallelism)
        .runOn(Schedulers.boundedElastic())
        .subscribe(this::handle);
  }

  @Override public CompletableFuture<V> asyncLoad(K key, Executor executor) {
    var result = new CompletableFuture<V>();
    sink.tryEmitNext(new Request<>(key, result)).orThrow();
    return result;
  }

  private void handle(List<Request<K, V>> requests) {
    try {
      var results = mappingFunction.apply(requests.stream().map(Request::key).collect(toSet()));
      requests.forEach(request -> request.result.complete(results.get(request.key())));
    } catch (Throwable t) {
      requests.forEach(request -> request.result.completeExceptionally(t));
    }
  }

  private record Request<K, V>(K key, CompletableFuture<V> result) {}
}
Sample test
@Test
public void coalesce() {
  AsyncLoadingCache<Integer, Integer> cache = Caffeine.newBuilder()
      .buildAsync(new CoalescingBulkLoader<>(
          /* maxSize */ 5, /* maxTime */ Duration.ofMillis(50), /* parallelism */ 5,
          keys -> keys.stream().collect(toMap(key -> key, key -> -key))));

  var results = new HashMap<Integer, CompletableFuture<Integer>>();
  for (int i = 0; i < 82; i++) {
    results.put(i, cache.get(i));
  }
  for (var entry : results.entrySet()) {
    assertThat(entry.getValue().join()).isEqualTo(-entry.getKey());
  }
}

ben-manes avatar Jul 17 '23 07:07 ben-manes