[FLINK-35670][cdc-connector][postgres]Flink cdc pipeline support postgres source
Fork from : https://github.com/apache/flink-cdc/pull/3442
- Add pipeline IT Case
- fixed db.sch*.talbe*, db must same and unique
How can i help here to speed up postgres-cdc-pipeline connector release?
Hi @Gunnnn. We plan to complete it in next version(3.5.0).
Hi, @Mrart. I've left some comments, and I think we should continue to improve the testing to ensure the reliability of the functionality. Would you like to add a e2e testing for this?
And a document for this is necessary.
Thanks for review, let me optimize it
@Mrart There is an error in checking style.
Error: src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java:[137,16] (naming) LocalVariableName: Name 'JdbcUrl' must match pattern '^[a-z][a-zA-Z0-9]*$'.
This version does not support the following options: ` @Experimental public static final ConfigOption<Boolean> INCLUDE_COMMENTS_ENABLED = ConfigOptions.key("include-comments.enabled") .booleanType() .defaultValue(false) .withDescription( "Whether enable include table and column comments, by default is false, if set to true, table and column comments will be sent. " + "Note: Enable this option will bring the implications on memory usage.");
@Experimental
public static final ConfigOption<Boolean> TREAT_TINYINT1_AS_BOOLEAN_ENABLED =
ConfigOptions.key("treat-tinyint1-as-boolean.enabled")
.booleanType()
.defaultValue(true)
.withDescription("Whether treat TINYINT(1) as boolean, by default is true. ");
@Experimental
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
ConfigOptions.key("use.legacy.json.format")
.booleanType()
.defaultValue(true)
.withDescription(
"Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");
@Experimental
public static final ConfigOption<Boolean>
SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED =
ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
.booleanType()
.defaultValue(false)
.withDescription(
"Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk. Defaults to false.");
`
@lvyanquan We currently support postgres sql types following the flink postgres cdc implementation. But https://www.rockdata.net/zh-cn/docs/14/datatype.html support the field type and the current implementation of the difference is bigger, our pr first, so I submit a complete implementation plan as soon as possible to repair?
Seems newly added test cases are failing, could @Mrart take a look?
Seems newly added test cases are failing, could @Mrart take a look?
It seems the test env problems, we can trigger test again.
@lvyanquan After CI passes, is it possible to enter the next process?
Thanks @Mrart for this contribution, merged.
Thanks @Mrart and all who involved this feature for the great work !