beam icon indicating copy to clipboard operation
beam copied to clipboard

Handling the restart with apache beam debezium application

Open sjay1728 opened this issue 2 years ago • 2 comments

In my current scenario, I have an Apache Beam application that employs Debezium for capturing changes in data. Unlike the typical integration with Kafka, I am not using it in conjunction with Kafka. However, I've encountered a challenge related to restarting the application. Whenever I restart the application, it retrieves all the data from the very beginning. My objective is to ensure that the application resumes consuming data from where it left off before the restart. I would greatly appreciate any guidance or insights on how to achieve this. Here is the structure of my application:

public class PipelineTest {
private static final DebeziumIO.ConnectorConfiguration mySqlConnectorConfig = DebeziumIO.ConnectorConfiguration
.create()
.withUsername("xxxxxxxx")
.withPassword("xxxxxxxx")
.withHostName("localhost")
.withPort("5432")
.withConnectorClass(PostgresConnector.class)
.withConnectionProperty("database.dbname", "xxxxxxxx")
.withConnectionProperty("database.server.id", "xxxxxxxx")
.withConnectionProperty("database.server.name", "xxxxxxxx")
.withConnectionProperty("plugin.name", "pgoutput")
.withConnectionProperty("slot.name", "debezium5")
.withConnectionProperty("table.include.list", "public.table1");

public static void main(String[] args) {
    PipelineOptions options = PipelineOptionsFactory.create();
    options.setRunner(org.apache.beam.runners.direct.DirectRunner.class); // Set the Direct Runner
    Pipeline pipeline = Pipeline.create(options);

    System.out.println("Pipeline starting...");

    PCollection<String> records = pipeline
            .apply(DebeziumIO.<String>read()
                    .withConnectorConfiguration(mySqlConnectorConfig)
                    .withFormatFunction(sourceRecord ->
                         sourceRecord.value().toString())
                    .withCoder(StringUtf8Coder.of())
            );

    records.apply(ParDo.of(new DoFn<String, String>() {
        @ProcessElement
        public void processElement(ProcessContext c) {
            System.out.println("Pipeline starting 3..." + c.element());
        }
    }));

    System.out.println("Pipeline ending...");

    pipeline.run().waitUntilFinish();
}
}

it seems like replication slot related following values are not changing with the at all, the query =>[ SELECT * FROM pg_replication_slots; ] catalog_xmin restart_lsn confirmed_flush_lsn

I would greatly appreciate any guidance or insights on how to achieve this..

I would greatly appreciate any guidance or insights on how to achieve this

sjay1728 avatar Aug 31 '23 03:08 sjay1728

Hi @sjay1728,

Did you find any solution on this issue?

fernandocast avatar May 15 '25 18:05 fernandocast

There is also report that on worker crash, it retrieves all the data from the very beginning before listening streaming changes.

It only implemented in memory offset tracking, as a static field:

https://github.com/apache/beam/blob/02743314acf81a390c7fb9305da585f5b0af0485/sdks/java/io/debezium/src/main/java/org/apache/beam/io/debezium/KafkaSourceConsumerFn.java#L101

Essentially DebeziumIO is not robust against worker crash or restart, and considered not production ready in its streaming use case.

Abacn avatar Dec 09 '25 21:12 Abacn