flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[pipeline-connector][kafka] add kafka pipeline data sink connector.

Open lvyanquan opened this issue 1 year ago • 9 comments

This closes https://github.com/ververica/flink-cdc-connectors/issues/2691.

  • support value format of debeium-json and canal-json.
  • The written topic of Kafka will be namespace.schemaName.tableName string of TableId,this can be changed using route function of pipeline.
  • If the written topic of Kafka is not existed, we will create one automatically.

lvyanquan avatar Dec 28 '23 06:12 lvyanquan

@Shawn-Hx PTAL.

lvyanquan avatar Dec 28 '23 09:12 lvyanquan

Thanks for these advice, addressed it.

lvyanquan avatar Jan 03 '24 01:01 lvyanquan

@leonardBang @PatrickRen CC.

lvyanquan avatar Jan 15 '24 05:01 lvyanquan

I tested it, but did not get the output result I expected. The column names were changed to f1, f2, and I noticed that the schema change event was skipped in the code. I think it should be configurable here, and I am trying to compile and modify it. I hope to get your help.

svea-vip avatar Feb 21 '24 07:02 svea-vip

@svea-vip Hi,can you show more about your situation, and what's the output result you expected? What impact will the change in table structure have if I only output data here.

lvyanquan avatar Feb 24 '24 07:02 lvyanquan

@lvyanquan now col1:1 col2:2 --> f1:1 f2:2 expect col1:1 col2:2 --> col1:1 col2:2

svea-vip avatar Feb 26 '24 07:02 svea-vip

@lvyanquan I would like to know your pipeline design for Kafka, which does not have independent metadata like a database. How do you plan to implement MetadataAccessor and MetadataApplier for Kafka.

svea-vip avatar Feb 26 '24 09:02 svea-vip

Can we use Schema Registry as Metadata provider ?

maver1ck avatar Feb 27 '24 11:02 maver1ck

@lvyanquan I would like to know your pipeline design for Kafka, which does not have independent metadata like a database. How do you plan to implement MetadataAccessor and MetadataApplier for Kafka.

As Kafka does not have independent metadata like a database, I actually do nothing in MetadataApplier, and skip processing SchemaChangeEvent.

lvyanquan avatar Feb 28 '24 02:02 lvyanquan

"The written topic of Kafka will be namespace. SchemaName. TableName string of TableId, this can be changed using route function of pipeline."

If there are many tables and one topic is written to each table, too many topics may cause kafka to write randomly. Specifies whether to write a topic, kakfka header record database & table name

melin avatar Feb 28 '24 06:02 melin

Specifies whether to write a topic, kakfka header record database & table name

I've added two options to specify this, PTAL. @melin

lvyanquan avatar Feb 29 '24 11:02 lvyanquan

Can we use Schema Registry as Metadata provider ?

Yes, but I am still considering whether to output and how to output table structure changes.

lvyanquan avatar Feb 29 '24 11:02 lvyanquan

Rebased to master.

lvyanquan avatar Apr 10 '24 02:04 lvyanquan

Support add custom key and value to kafka header, Value is a constant value, example: region = hangzhou

melin avatar Apr 15 '24 06:04 melin