flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[FLINK-35670][cdc-connector][postgres]Flink cdc pipeline support postgres source

Open Mrart opened this issue 9 months ago • 10 comments

Fork from : https://github.com/apache/flink-cdc/pull/3442

  1. Add pipeline IT Case
  2. fixed db.sch*.talbe*, db must same and unique

Mrart avatar Mar 25 '25 06:03 Mrart

How can i help here to speed up postgres-cdc-pipeline connector release?

Hi @Gunnnn. We plan to complete it in next version(3.5.0).

Gunnnn avatar Apr 22 '25 13:04 Gunnnn

Hi, @Mrart. I've left some comments, and I think we should continue to improve the testing to ensure the reliability of the functionality. Would you like to add a e2e testing for this?

lvyanquan avatar May 20 '25 12:05 lvyanquan

And a document for this is necessary.

lvyanquan avatar May 20 '25 12:05 lvyanquan

Thanks for review, let me optimize it

Mrart avatar May 29 '25 01:05 Mrart

@Mrart There is an error in checking style.

Error:  src/test/java/org/apache/flink/cdc/pipeline/tests/PostgresE2eITCase.java:[137,16] (naming) LocalVariableName: Name 'JdbcUrl' must match pattern '^[a-z][a-zA-Z0-9]*$'.

lvyanquan avatar Jun 09 '25 07:06 lvyanquan

This version does not support the following options: ` @Experimental public static final ConfigOption<Boolean> INCLUDE_COMMENTS_ENABLED = ConfigOptions.key("include-comments.enabled") .booleanType() .defaultValue(false) .withDescription( "Whether enable include table and column comments, by default is false, if set to true, table and column comments will be sent. " + "Note: Enable this option will bring the implications on memory usage.");

@Experimental
public static final ConfigOption<Boolean> TREAT_TINYINT1_AS_BOOLEAN_ENABLED =
        ConfigOptions.key("treat-tinyint1-as-boolean.enabled")
                .booleanType()
                .defaultValue(true)
                .withDescription("Whether treat TINYINT(1) as boolean, by default is true. ");

@Experimental
public static final ConfigOption<Boolean> USE_LEGACY_JSON_FORMAT =
        ConfigOptions.key("use.legacy.json.format")
                .booleanType()
                .defaultValue(true)
                .withDescription(
                        "Whether to use legacy json format. The default value is true, which means there is no whitespace before value and after comma in json format.");

@Experimental
public static final ConfigOption<Boolean>
        SCAN_INCREMENTAL_SNAPSHOT_UNBOUNDED_CHUNK_FIRST_ENABLED =
        ConfigOptions.key("scan.incremental.snapshot.unbounded-chunk-first.enabled")
                .booleanType()
                .defaultValue(false)
                .withDescription(
                        "Whether to assign the unbounded chunks first during snapshot reading phase. This might help reduce the risk of the TaskManager experiencing an out-of-memory (OOM) error when taking a snapshot of the largest unbounded chunk.  Defaults to false.");

`

Mrart avatar Jun 10 '25 12:06 Mrart

@lvyanquan We currently support postgres sql types following the flink postgres cdc implementation. But https://www.rockdata.net/zh-cn/docs/14/datatype.html support the field type and the current implementation of the difference is bigger, our pr first, so I submit a complete implementation plan as soon as possible to repair?

Mrart avatar Jun 13 '25 02:06 Mrart

Seems newly added test cases are failing, could @Mrart take a look?

yuxiqian avatar Jun 18 '25 01:06 yuxiqian

Seems newly added test cases are failing, could @Mrart take a look?

It seems the test env problems, we can trigger test again.

Mrart avatar Jun 18 '25 03:06 Mrart

@lvyanquan After CI passes, is it possible to enter the next process?

Mrart avatar Jun 20 '25 00:06 Mrart

Thanks @Mrart for this contribution, merged.

lvyanquan avatar Jul 31 '25 05:07 lvyanquan

Thanks @Mrart and all who involved this feature for the great work !

leonardBang avatar Jul 31 '25 06:07 leonardBang