reactor-kafka icon indicating copy to clipboard operation
reactor-kafka copied to clipboard

Provide option for KafkaReceiver's graceful shutdown

Open cjlee38 opened this issue 1 year ago • 4 comments

Motivation

Over the past few days, I've been looking for a way to shutdown gracefully KafkaReceiver, but couldn't find any proper way to handle this. I read related issues (https://github.com/reactor/reactor-kafka/issues/247, https://github.com/reactor/reactor-kafka/issues/51, https://github.com/reactor/reactor-kafka/issues/196) or SO questions but they don't work as I expected (This might be caused by my bad understanding of reactor or kafka, so please excuse me)

Example code snippets :

val disposable = kafkaReceiver.receive()
    .flatMapSequential { record -> process(record).thenReturn(record) }
    .concatMap { record -> 
        record.receiverOffset().acknowledge()
        record.receiverOffset().commit() 
    }
    .subscribe()

This is typical case.

  1. receive record
  2. process record
  3. ack and commit (can be omitted when using auto-commit)

Desired solution

So, I think this is very common case : when I re-start my application(which is based on spring framework), consumers stop fetching records, and ongoing(I mean, already fetched records) flux keeps processing and also commits, and then complete the flux.

However, just disposing the disposable would not work as expected, because there is possibility that processed record not be committed.

Considered alternatives

There is no concrete idea to implement this, but things to consider are next.

  1. The Scheduler interface of reactor provides disposeGracefully method. image These methods (1 2) can be replaced with this (or selected by option)
  2. add sink.emitComplete() in ConsumerEventLoop#stop
  3. It looks like ConsumerEventLoop keeps polling from broker without hesitation and emit records into sink. If it's right, when producing numerous records in an instant would cause some problems. For example, let's say 10,000 records are produced, and consumer fetched them all within a few seconds. Besides OOM issue, flux needs to wait until all records are drained for desired graceful shutdown. I think emitting records should have some delays.

Additional context

In case of my ignorance, please let me know. Any other opinions would be appreciated. Thanks

cjlee38 avatar Jan 03 '24 01:01 cjlee38

upvote

KafkaProServerless avatar Feb 18 '24 06:02 KafkaProServerless

upvote

ajax-semenov-y avatar Mar 06 '24 09:03 ajax-semenov-y