Support custom timeout strategies in StreamMessage.timeout(), with built-in support for a token-bucket timeout-limiting strategy
Currently, StreamMessage.timeout() only supports fixed timeouts.
I’d like to propose adding support for custom timeout strategies via a StreamTimeoutStrategy interface.
Users would implement their own strategy and pass it to timeout(...), for example:
public interface StreamTimeoutStrategy {
/**
* Compares the current time (`currentTimeNanos`) with the last event time (`lastEventTimeNanos`)
* and evaluates whether a timeout has occurred and when the next check should be scheduled.
*/
StreamTimeoutDecision evaluateTimeout(long currentTimeNanos, long lastEventTimeNanos);
}
public final class StreamTimeoutDecision {
private static final long NO_NEXT_TIMEOUT = -1L;
private final boolean timedOut;
private final long nextScheduleTimeNanos;
}
Basically, I would like to suggest the following two strategies.
-
Default mode (
StreamTimeoutMode+ duration)StreamMessage.timeout(StreamTimeoutMode.UNTIL_NEXT, Duration.ofSeconds(10)); // Internally: new DefaultTimeoutStrategy(StreamTimeoutMode.UNTIL_NEXT, timeoutNanos);
-
A strategy based on the token bucket rate-limiting algorithm. (token-bucket timeout-limiting strategy) Allows occasional network jitter without terminating the stream, but triggers a timeout when delays occur consecutively.
This prevents unnecessary stream termination due to transient network hiccups.StreamMessage.timeout(new TokenBucketTimeoutStrategy(...));