flink-cdc
flink-cdc copied to clipboard
postgres-cdc.md DataStream Source The code on the example needs to be added to make the user understand and use it better.
postgres-cdc.md DataStream Source
The code on the example needs to be added to make the user understand and use it better.
Environment :
- Flink version : 1.13.3
- Flink CDC version: 2.2.1
- Database and version: PostgreSQL 11.5
import java.util.Properties;
Properties properties = new Properties();
properties.setProperty("snapshot.mode", "always");
.decodingPluginName("pgoutput")
.slotName("inventory_products_slot")
.debeziumProperties(properties)
The complete contents are as follows:
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.source.SourceFunction;
import com.ververica.cdc.debezium.JsonDebeziumDeserializationSchema;
import com.ververica.cdc.connectors.postgres.PostgreSQLSource;
import java.util.Properties;
public class PostgreSQLSourceExample {
public static void main(String[] args) throws Exception {
Properties properties = new Properties();
properties.setProperty("snapshot.mode", "always");
SourceFunction<String> sourceFunction = PostgreSQLSource.<String>builder()
.hostname("localhost")
.port(5432)
.database("postgres") // monitor postgres database
.schemaList("inventory") // monitor inventory schema
.tableList("inventory.products") // monitor products table
.username("flinkuser")
.password("flinkpw")
.decodingPluginName("pgoutput")
.slotName("inventory_products_slot")
.deserializer(new JsonDebeziumDeserializationSchema()) // converts SourceRecord to JSON String
.debeziumProperties(properties)
.build();
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env
.addSource(sourceFunction)
.print().setParallelism(1); // use parallelism 1 for sink to keep message ordering
env.execute();
}
}