spring-kafka
spring-kafka copied to clipboard
Pause a consumer for X minutes
trafficstars
Expected Behavior
@Component @KafkaListener(topics = {"job.cache"}, groupId = "cache-consumer") public class JobCacheConsumer { private static Cache cache = new MongoDbCache();
@KafkaHandler
@RetryableTopic(
backoff = @Backoff(delay = 10_000, multiplier = 2, maxDelay = 36_000_000),
attempts = "-1",
autoCreateTopics = "true",
include = SocketTimeoutException.class, exclude = CachRetryException.class)
public void jobHandler(Job job, @Headers Map<String, Object> headers
, @Header(KafkaHeaders.RECEIVED_KEY) String key
) throws CachRetryException {
try{ callForeignResource() }catch(SocketTimeoutException){ //PAUSE FOR 30 minutes }
Current Behavior
As far as I can tell you can only pause and resume. I am not sure how to get pausing to be time constrained
Context
I am calling mongodb (in the cloud) when I process certain topics. If mongo is down I don't want to continue processing events in the queue as they are all likely to fail unnecessarily consuming resources. I'd like to pause the whole topic while I give mongo some time to fix itself.