scylla-cdc-rust icon indicating copy to clipboard operation
scylla-cdc-rust copied to clipboard

About checkpoint_saver use

Open dragon-D opened this issue 2 years ago • 2 comments

I installed the tutorial with checkpoint_saver turned on and then tried to turn off the service, during which the table changed and the service ran again No change notification was captured

    let user_checkpoint_saver = Arc::new(
        TableBackedCheckpointSaver::new_with_default_ttl(
            session.clone(),
            &args.keyspace,
            "checkpoints",
        )
            .await
            .unwrap(),
    );
    
    let (mut cdc_log_printer, handle) = CDCLogReaderBuilder::new()
        .session(session)
        .keyspace(&args.keyspace)
        .table_name(&args.table)
        .window_size(Duration::from_secs_f64(args.window_size))
        .safety_interval(Duration::from_secs_f64(args.safety_interval))
        .sleep_interval(Duration::from_secs_f64(args.sleep_interval))
        .consumer_factory(Arc::new(PrinterConsumerFactory))
        .should_save_progress(true) // Mark that we want to save progress.
        .should_load_progress(true) // Mark that we want to start consuming CDC logs from the last saved checkpoint.
        .pause_between_saves(time::Duration::from_millis(100)) // Save progress each 100 ms. If not specified, a default value of 10 seconds is used.
        .checkpoint_saver(user_checkpoint_saver) // Use `user_checkpoint_saver to manage checkpoints.
        // .start_timestamp(start_timestamp)
        .build()
        .await?;

Is there a mistake

dragon-D avatar Jun 29 '23 03:06 dragon-D

I've also spent quite a while struggling with this. I set it up the same as you. To debug, I ran dbg!(checkpoint_saver.load_last_checkpoint().await) and it is indeed returning Some. But the timestamp shown there seems to always be the same, so I'm not sure whats happening there.

tqwewe avatar Dec 27 '23 14:12 tqwewe

After playing around and diving into the source code, I think you need to set the start_timestamp to an earlier timestamp than the checkpoint. You could use Duration::ZERO.

tqwewe avatar Jan 03 '24 07:01 tqwewe