spring-kafka icon indicating copy to clipboard operation
spring-kafka copied to clipboard

Pause a consumer for X minutes

Open jwreiser opened this issue 1 year ago • 4 comments
trafficstars

Expected Behavior

@Component @KafkaListener(topics = {"job.cache"}, groupId = "cache-consumer") public class JobCacheConsumer { private static Cache cache = new MongoDbCache();

@KafkaHandler
@RetryableTopic(
        backoff = @Backoff(delay = 10_000, multiplier = 2, maxDelay = 36_000_000),
        attempts = "-1",
        autoCreateTopics = "true",
        include = SocketTimeoutException.class, exclude = CachRetryException.class)
public void jobHandler(Job job, @Headers Map<String, Object> headers
        , @Header(KafkaHeaders.RECEIVED_KEY) String key
                       ) throws CachRetryException {

try{ callForeignResource() }catch(SocketTimeoutException){ //PAUSE FOR 30 minutes }

Current Behavior

As far as I can tell you can only pause and resume. I am not sure how to get pausing to be time constrained

Context

I am calling mongodb (in the cloud) when I process certain topics. If mongo is down I don't want to continue processing events in the queue as they are all likely to fail unnecessarily consuming resources. I'd like to pause the whole topic while I give mongo some time to fix itself.

jwreiser avatar May 08 '24 15:05 jwreiser