expiringmap
expiringmap copied to clipboard
ExpiringMap may return already expired entry (ExpirationPolicy.ACCESSED)
We experienced situation were the ExpiringMap returned an expired entry after ExpirationListener was already notified that this specific entry was expired.
The map is configured with the policy ExpirationPolicy.ACCESSED
(version 0.5.10)
The expired entry is returned in subsequent calls to get(Object)
(not only once) until it is not accessed for a certain time again. Then the expired entry is expired again and correctly removed from the map.
I don't know if anybody experienced this issue or at least if this behaviour is a problem in other applications at all. A little (simplified) background why this was in issue - at least in our application ;-) : In our case we used the map to "cache" some "sequentializers" per "destination" (/key) (owning some kind of Thread). In case that destination is not "used" anymore we want to remove the sequentializer and stop the backed thread (to minimize resources). As soon as there is a task again for that specific destination we create and cache a new sequentializer (which is reused again until it is not needed for a specified time). For this usecase we used the handy ExpiringMap (great work by the way :-) )
This issue occures under very rare conditions. It seems to be a timing issue between the "application-thread" and the "ExpiringMap-Expirer-Thread" where the application tries to get the object at the exact same time the Expirer wanted to remove it.
I have created a simplified test-case which tries to repoduce this issue (see TestExpiringMap.java
below).
In those cases were this issue occures one would see the following sequence of "log-events":
09:12:35.115[ExpiringMap-Expirer]: TestExpiringMap$Worker@3f44d4b - stop...
09:12:35.115[ExpiringMap-Expirer]: TestExpiringMap$Worker@3f44d4b - ...stopped
09:12:35.116[main]: ERROR: TestExpiringMap$Worker@3f44d4b already stopped!
09:12:35.118[main]: ERROR: TestExpiringMap$Worker@3f44d4b already stopped!
09:12:35.119[main]: ERROR: TestExpiringMap$Worker@3f44d4b already stopped!
09:12:35.625[ExpiringMap-Expirer]: TestExpiringMap$Worker@3f44d4b - stop...
09:12:35.625[ExpiringMap-Expirer]: ERROR: TestExpiringMap$Worker@3f44d4b - ... was already stopped!
It is easier to reproduce if one modifies the actual ExperingMap and add a Thread.sleep(1);
in the get(Object)
method
This does not change any consistency locking but just "simulates" unfortunate thread scheduling.
@Override
@SuppressWarnings("unchecked")
public V get(Object key) {
ExpiringEntry<K, V> entry = getEntry(key);
try {
//simulate unfortunate thread-scheduling
Thread.sleep(1);
} catch (InterruptedException e) {
}
if (entry == null) {
return load((K) key);
} else if (ExpirationPolicy.ACCESSED.equals(entry.expirationPolicy.get())) {
resetEntry(entry, false);
}
return entry.getValue();
}
The sequence seems to be:
-
main
acquires read-lock; getEntry is stored fork1=v1
; releases read-lock -
Expirer
acquires write-lock; removesk1=v1
; notifies expiration listener; releases write-lock -
main
now checks ifExpirationPolicy.ACCESSED
is used, and resets the Entry -
main
acquires write-lock;k1=v1
is "reorderd" by removingk1
(which has no effect; was already removed byExpirer
) and re-puts the already expiredv1
. -
main
gets the already expired valuev1
returned from theget(Object)
method. -
All subsequent calls to
get(Object)
fork1
will return the expiredv1
until it is expired again.
TestExpiringMap.java
import java.time.LocalTime;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import net.jodah.expiringmap.ExpirationListener;
import net.jodah.expiringmap.ExpirationPolicy;
import net.jodah.expiringmap.ExpiringMap;
public class TestExpiringMap {
private static boolean issueOccured;
public static void main(String[] args) {
ExpirationListener<String, Worker> l = (k, worker) -> {
worker.stop();
};
Map<String, Worker> workerCache = ExpiringMap.builder()
.expiration(500, TimeUnit.MILLISECONDS)
.expirationPolicy(ExpirationPolicy.ACCESSED)
.expirationListener(l)
.build();
while (!issueOccured) {
log("########### NEXT ############");
sendMessageLoop(workerCache);
sleepExact(750);
}
}
private static void sendMessageLoop(Map<String, Worker> workerCache) {
//create a cached worker
doWork(workerCache);
//try to hit the exact time, when the entry would expire
for (int sleep = 499; sleep <= 520; sleep++) {
if (issueOccured) {
break;
}
sleepExact(sleep); //wait for it
// verify that each invocation (in this case three) got the expired entry.
// not only just once (but until this expired entry will expire "again")
doWork(workerCache);
doWork(workerCache);
doWork(workerCache);
}
}
private static void doWork(Map<String, Worker> workerCache) {
// no need to synchronize. acces by "singe-thread" (besides the Expirer)
Worker worker = workerCache.get("key1");
if (worker == null) {
worker = new Worker();
workerCache.put("key1", worker);
}
worker.doTheWork();
}
private static void sleepExact(long millis) {
long start = System.nanoTime();
while (TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start) < millis) {
Thread.yield();
}
}
private static class Worker {
AtomicBoolean stopped = new AtomicBoolean();
private void doTheWork() {
if (stopped.get()) {
log("ERROR: " + this + " already stopped!");
issueOccured = true;
} else {
//DO THE WORK
}
}
private void stop() {
log(this + " - stop...");
boolean success = stopped.compareAndSet(false, true);
if (success) {
log(this + " - ...stopped");
} else {
log("ERROR: " + this + " - ... was already stopped!");
}
}
}
private static void log(String msg) {
System.out.println(LocalTime.now() + "[" + Thread.currentThread().getName() + "]: " + msg);
}
}