flink-cdc
flink-cdc copied to clipboard
[pipeline-connector][kafka] add kafka pipeline data sink connector.
This closes https://github.com/ververica/flink-cdc-connectors/issues/2691.
- support value format of
debeium-json
andcanal-json
. - The written topic of Kafka will be
namespace.schemaName.tableName
string of TableId,this can be changed usingroute
function of pipeline. - If the written topic of Kafka is not existed, we will create one automatically.
@Shawn-Hx PTAL.
Thanks for these advice, addressed it.
@leonardBang @PatrickRen CC.
I tested it, but did not get the output result I expected. The column names were changed to f1, f2, and I noticed that the schema change event was skipped in the code. I think it should be configurable here, and I am trying to compile and modify it. I hope to get your help.
@svea-vip Hi,can you show more about your situation, and what's the output result you expected? What impact will the change in table structure have if I only output data here.
@lvyanquan now col1:1 col2:2 --> f1:1 f2:2 expect col1:1 col2:2 --> col1:1 col2:2
@lvyanquan I would like to know your pipeline design for Kafka, which does not have independent metadata like a database. How do you plan to implement MetadataAccessor and MetadataApplier for Kafka.
Can we use Schema Registry as Metadata provider ?
@lvyanquan I would like to know your pipeline design for Kafka, which does not have independent metadata like a database. How do you plan to implement MetadataAccessor and MetadataApplier for Kafka.
As Kafka does not have independent metadata like a database, I actually do nothing in MetadataApplier, and skip processing SchemaChangeEvent.
"The written topic of Kafka will be namespace. SchemaName. TableName string of TableId, this can be changed using route function of pipeline."
If there are many tables and one topic is written to each table, too many topics may cause kafka to write randomly. Specifies whether to write a topic, kakfka header record database & table name
Specifies whether to write a topic, kakfka header record database & table name
I've added two options to specify this, PTAL. @melin
Can we use Schema Registry as Metadata provider ?
Yes, but I am still considering whether to output and how to output table structure changes.
Rebased to master.
Support add custom key and value to kafka header, Value is a constant value, example: region = hangzhou