kinesis-sql icon indicating copy to clipboard operation
kinesis-sql copied to clipboard

How do I ensure checkpointing for kafka streams?

Open Vikas-kum opened this issue 3 years ago • 2 comments

Hi,

I have a stream and the current code is repeating the processing of data in streams when restarted. I wanted to make sure that there is some checkpoint written which ensures that processing starts from where the last processing was stopped.

Can anyone let me know how do we ensure such checkpointing?

Vikas-kum avatar Jul 08 '21 22:07 Vikas-kum

Hi, I am also curious for this. I haven't found any way how to it. I have found that you specify application name in Spark treaming Kinesis connector, but nothing like that is present here :-(

snilard avatar Aug 05 '21 16:08 snilard

This seems to work for me:

df.writeStream .trigger(Trigger.ProcessingTime(interval)) .foreachBatch { (batchDF: DataFrame, batchId: Long) => // Transform and write batchDF batchDF.persist() //some transformation batchDF.unpersist() () // for scala v 2.12 only } .option("checkpointLocation", "/path/to/checkpoint") .start() .awaitTermination()

success-m avatar Jan 28 '22 15:01 success-m