flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

postgres-cdc.md DataStream Source The code on the example needs to be added to make the user understand and use it better.

Open landhuang opened this issue 3 years ago • 0 comments

postgres-cdc.md DataStream Source

The code on the example needs to be added to make the user understand and use it better.

Environment :

  • Flink version : 1.13.3
  • Flink CDC version: 2.2.1
  • Database and version: PostgreSQL 11.5
import java.util.Properties;

     Properties properties = new Properties();
     properties.setProperty("snapshot.mode", "always");

      .decodingPluginName("pgoutput")
      .slotName("inventory_products_slot")

      .debeziumProperties(properties)

The complete contents are as follows:

import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;

import java.util.Properties;

public class PostgreSQLSourceExample {
  public static void main(String[] args) throws Exception {

     Properties properties = new Properties();
     properties.setProperty("snapshot.mode", "always");

    SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
      .hostname("localhost")
      .port(5432)
      .database("postgres") // monitor postgres database
      .schemaList("inventory")  // monitor inventory schema
      .tableList("inventory.products") // monitor products table
      .username("flinkuser")
      .password("flinkpw")
      .decodingPluginName("pgoutput")
      .slotName("inventory_products_slot")
      .deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
      .debeziumProperties(properties)
      .build();

    StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    env
      .addSource(sourceFunction)
      .print().setParallelism(1); // use parallelism 1 for sink to keep message ordering

    env.execute();
  }
}

landhuang avatar Aug 19 '22 02:08 landhuang