flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[Feature][Pipeline] Flink CDC pipeline supports transform

Open aiwenmo opened this issue 2 years ago • 3 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Motivation

source:
  type: mysql
  name: source-database
  host: localhost
  port: 3306
  username: admin
  password: pass
  tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*
  chunk-column: app_order_.*:id,web_order:product_id
  capture-new-tables: true

sink:
  type: kafka
  name: sink-queue
  bootstrap-servers: localhost:9092
  auto-create-table: true

route:
  - source-table: mydb.default.app_order_.*
    sink-table: odsdb.default.app_order
    description: sync all sharding tables to one
  - source-table: mydb.default.web_order
    sink-table: odsdb.default.ods_web_order
    description: sync table to with given prefix ods_

transform:
  - source-table: mydb.app_order_.*
    projection: id, order_id, TO_UPPER(product_name)
    filter: id > 10 AND order_id > 100
    description: project fields from source table
  - source-table: mydb.web_order_.*
    projection: CONCAT(id, order_id) as uniq_id, *
    filter: uniq_id > 10
    description: add new uniq_id for each row

pipeline:
  name: source-database-sync-pipe
  parallelism: 4
  enable-schema-evolution: false

Solution

I am currently working on the implementation of this feature, and the relevant content will be added gradually in the following text.

Alternatives

None.

Anything else?

No response

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

aiwenmo avatar Dec 26 '23 16:12 aiwenmo

transform:
  - source-table: mydb.app_order_.*
    projection: id, order_id, TO_UPPER(product_name)
    filter: id > 10 AND order_id > 100
    description: project fields from source table
  - source-table: mydb.web_order_.*
    projection: CONCAT(id, order_id) as uniq_id, *
    filter: uniq_id > 10
    description: add new uniq_id for each row

aiwenmo avatar Dec 28 '23 10:12 aiwenmo

  • [X] #2937

aiwenmo avatar Dec 28 '23 10:12 aiwenmo

Completed features:

  1. Support projection.
  2. Support filter.
  3. Support automatic evolution of projection's column.
  4. Support SQL operators, functions, constants, and metadata columns.
  5. Support restoring jobs from save points.

Todo:

  • [ ] #3081
  • [ ] #3077
  • [ ] #3078
  • [ ] #3079
  • [ ] #3080

aiwenmo avatar Feb 26 '24 15:02 aiwenmo

Closing this issue as it has been migrated to Apache Jira.

PatrickRen avatar Apr 09 '24 06:04 PatrickRen