scylla-cdc-rust
scylla-cdc-rust copied to clipboard
About checkpoint_saver use
I installed the tutorial with checkpoint_saver turned on and then tried to turn off the service, during which the table changed and the service ran again No change notification was captured
let user_checkpoint_saver = Arc::new(
TableBackedCheckpointSaver::new_with_default_ttl(
session.clone(),
&args.keyspace,
"checkpoints",
)
.await
.unwrap(),
);
let (mut cdc_log_printer, handle) = CDCLogReaderBuilder::new()
.session(session)
.keyspace(&args.keyspace)
.table_name(&args.table)
.window_size(Duration::from_secs_f64(args.window_size))
.safety_interval(Duration::from_secs_f64(args.safety_interval))
.sleep_interval(Duration::from_secs_f64(args.sleep_interval))
.consumer_factory(Arc::new(PrinterConsumerFactory))
.should_save_progress(true) // Mark that we want to save progress.
.should_load_progress(true) // Mark that we want to start consuming CDC logs from the last saved checkpoint.
.pause_between_saves(time::Duration::from_millis(100)) // Save progress each 100 ms. If not specified, a default value of 10 seconds is used.
.checkpoint_saver(user_checkpoint_saver) // Use `user_checkpoint_saver to manage checkpoints.
// .start_timestamp(start_timestamp)
.build()
.await?;
Is there a mistake
I've also spent quite a while struggling with this. I set it up the same as you. To debug, I ran dbg!(checkpoint_saver.load_last_checkpoint().await) and it is indeed returning Some. But the timestamp shown there seems to always be the same, so I'm not sure whats happening there.
After playing around and diving into the source code, I think you need to set the start_timestamp to an earlier timestamp than the checkpoint. You could use Duration::ZERO.