kinesis-sql
kinesis-sql copied to clipboard
How do I ensure checkpointing for kafka streams?
Hi,
I have a stream and the current code is repeating the processing of data in streams when restarted. I wanted to make sure that there is some checkpoint written which ensures that processing starts from where the last processing was stopped.
Can anyone let me know how do we ensure such checkpointing?
Hi, I am also curious for this. I haven't found any way how to it. I have found that you specify application name in Spark treaming Kinesis connector, but nothing like that is present here :-(
This seems to work for me:
df.writeStream .trigger(Trigger.ProcessingTime(interval)) .foreachBatch { (batchDF: DataFrame, batchId: Long) => // Transform and write batchDF batchDF.persist() //some transformation batchDF.unpersist() () // for scala v 2.12 only } .option("checkpointLocation", "/path/to/checkpoint") .start() .awaitTermination()