flink-cdc
flink-cdc copied to clipboard
[Feature][Pipeline] Flink CDC pipeline supports transform
Search before asking
- [X] I searched in the issues and found nothing similar.
Motivation
source:
type: mysql
name: source-database
host: localhost
port: 3306
username: admin
password: pass
tables: adb.*, bdb.user_table_[0-9]+, [app|web]_order_.*
chunk-column: app_order_.*:id,web_order:product_id
capture-new-tables: true
sink:
type: kafka
name: sink-queue
bootstrap-servers: localhost:9092
auto-create-table: true
route:
- source-table: mydb.default.app_order_.*
sink-table: odsdb.default.app_order
description: sync all sharding tables to one
- source-table: mydb.default.web_order
sink-table: odsdb.default.ods_web_order
description: sync table to with given prefix ods_
transform:
- source-table: mydb.app_order_.*
projection: id, order_id, TO_UPPER(product_name)
filter: id > 10 AND order_id > 100
description: project fields from source table
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
description: add new uniq_id for each row
pipeline:
name: source-database-sync-pipe
parallelism: 4
enable-schema-evolution: false
Solution
I am currently working on the implementation of this feature, and the relevant content will be added gradually in the following text.
Alternatives
None.
Anything else?
No response
Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
transform:
- source-table: mydb.app_order_.*
projection: id, order_id, TO_UPPER(product_name)
filter: id > 10 AND order_id > 100
description: project fields from source table
- source-table: mydb.web_order_.*
projection: CONCAT(id, order_id) as uniq_id, *
filter: uniq_id > 10
description: add new uniq_id for each row
- [X] #2937
Completed features:
- Support projection.
- Support filter.
- Support automatic evolution of projection's column.
- Support SQL operators, functions, constants, and metadata columns.
- Support restoring jobs from save points.
Todo:
- [ ] #3081
- [ ] #3077
- [ ] #3078
- [ ] #3079
- [ ] #3080
Closing this issue as it has been migrated to Apache Jira.