flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[Feature-2932][Pipeline] Flink CDC pipeline supports transform

Open aiwenmo opened this issue 2 years ago • 7 comments

This is a draft pull request, and the functionality has not been completed yet. Please discuss the details, implementation, and optimization under this PR. I will complete the remaining code. thx

aiwenmo avatar Dec 27 '23 16:12 aiwenmo

What is the more appropriate way to obtain the field name and data type of dataChangeEvent. after()?

aiwenmo avatar Dec 28 '23 01:12 aiwenmo

Change the parameter definition of transform to the following:

transform:
  - source-table: mydb.app_order_.*
    projection: id, order_id, TO_UPPER(product_name)
    filter: id > 10 AND order_id > 100
    description: project fields from source table
  - source-table: mydb.web_order_.*
    projection: CONCAT(id, order_id) as uniq_id, *
    filter: uniq_id > 10
    description: add new uniq_id for each row

aiwenmo avatar Dec 28 '23 10:12 aiwenmo

@lvyanquan Thanks for your review. I will fix these issues tonight.

aiwenmo avatar Jan 02 '24 02:01 aiwenmo

There are only two bugs that have not been fixed yet. I will fix them again today. 1.A doc or more tests is necessary to show what kind of Projection we support after this pr. For example, can we support CAST Function now? 2.We need to support more types, such as some time types and float types.

aiwenmo avatar Jan 03 '24 01:01 aiwenmo

Hi @aiwenmo , I met Invalid signature file digest for Manifest main when testing this transform in E2e environment. Can you help to fix it when building flink-cdc-dist jar?

lvyanquan avatar Jan 29 '24 07:01 lvyanquan

Overall, look good to me, and can you fix the remaining minor issue with CI?

lvyanquan avatar Jan 31 '24 01:01 lvyanquan

Completed features:

  1. Support projection.
  2. Support filter.
  3. Support automatic evolution of projection's column.
  4. Support SQL operators, functions, constants, and metadata columns.
  5. Support restoring jobs from save points.

Todo:

  1. SchemeRegistry saves the routing rules used to restore the TransformDataOperator.
  2. Support UDF.
  3. Support type conversion.
  4. Support CASE WHEN.
  5. Support the strategy of schema evolution.

aiwenmo avatar Feb 05 '24 13:02 aiwenmo

I have fixed most of the comment, the following are the ones that have not been fixed: 1.For me, this class act multiple roles, I guess you mean it should be a compiled evaluator for each column, but the java doc says it used to describe the information of transformation column. We need to classes to make this logic clear, the first one class describes the information of transformation column, the later one is a column evaluator 2.same suggestion like above, we may have TransformProjection and TransformProjector 3.Could we cache the generated ExpressionEvaluator in transform operator where we can build cache or clear the cache in open() and close() methods?

aiwenmo avatar Feb 28 '24 15:02 aiwenmo

Thanks @aiwenmo for the great work, LGTM although I think we can improve the test coverage as well as the document, but I think we can improve them later.

Thanks @leonardBang , I will improve them in the new pull request.

aiwenmo avatar Apr 02 '24 14:04 aiwenmo