smallrye-mutiny icon indicating copy to clipboard operation
smallrye-mutiny copied to clipboard

Memoize for a duration based on the memoized item

Open scrocquesel opened this issue 2 years ago • 1 comments

Context

Following discussion at https://github.com/smallrye/smallrye-mutiny/discussions/909.

Right now, memoize can be configured to expire based on a BooleanSupplier. To invalidate memoized item based on its value, a solution is to used a shared AtomicBoolean with a throw/retry mechanism when the boolean become true. See https://github.com/smallrye/smallrye-mutiny/discussions/909#discussioncomment-2720237.

Another drawback is that failure cannot be skipped or memoized differently

Description

It would more convenient to be able to pass a BiFunction that takes the item and the failure and return a boolean to until().

Uni.createFrom().item(new AccessTokenResponse())
    .memoize().until((item, failure) -> failure != null || item.isExpired())

Additional details

Some shortcuts could help like

Uni.createFrom().item(new AccessTokenResponse())
    .memoize().notFailure().until(item -> item.isExpired())
    .memoize().notFailure().atLeast(item -> item.getExpiresIn())

scrocquesel avatar May 10 '22 21:05 scrocquesel

Just yesterday I wrote this memoizing operator which is used like: uni.plug(UniMemoizeItemOp.untilItemExpires(getItemExpiry))

public class UniMemoizeItemOp<T> extends AbstractUni<T> {

	private final Uni<T> upstream;
	private final Function<T, Duration> transform;

	private Duration duration;
	private volatile long startTime = -1;

	private UniMemoizeItemOp(Uni<T> upstream, Function<T, Duration> transform) {
		this.upstream = new UniMemoizeOp<>(upstream, this::checkExpiry);
		this.transform = transform;
	}

	/**
	 * Memoize the Uni item until the item's expiry
	 *
	 * @param transform determine the expiry duration from the item
	 */
	public static <T> Function<Uni<T>, Uni<T>> untilItemExpires(Function<T, Duration> transform) {
		return upstream -> new UniMemoizeItemOp<>(upstream, transform);
	}

	@Override
	public void subscribe(UniSubscriber<? super T> subscriber) {
		AbstractUni.subscribe(upstream, new UniOperatorProcessor<T, T>(subscriber) {

			@Override
			public void onItem(T item) {
				duration = transform.apply(item);
				super.onItem(item);
			}

		});
	}

	/**
	 * This is an almost-clone of the anonymous class in {@link io.smallrye.mutiny.groups.UniMemoize#atLeast(Duration)}
	 * because it's not exposed in a reusable way
	 */
	private boolean checkExpiry() {
		long now = System.nanoTime();
		if (startTime == -1) {
			startTime = now;
		}

		/* This is the only difference to the "base" class's method */
		if (duration == null) {
			/* Wait until there's a duration */
			return false;
		}

		// Avoid arithmetic overflow when retrieving the nanos of the FOREVER duration
		if (duration == ChronoUnit.FOREVER.getDuration()) {
			startTime = now;
			return false;
		}

		boolean invalidates = (now - startTime) > duration.toNanos();
		if (invalidates) {
			startTime = now;
		}
		return invalidates;
	}

}

I've not yet verified whether there's any data races between onItem and checkExpiry. A duration getter for failure handling also need to be added.

RohanHart avatar May 11 '22 04:05 RohanHart