add --metadata_column support for Kafka CDC connector
Purpose
Add --metadata_column support to Paimon Kafka CDC connector, similar to the already existing options added for MySQL and Postgres: https://github.com/apache/paimon/pull/2077
Supported metadata columns are those on org.apache.kafka.clients.consumer.ConsumerRecord i.e.:
- topic
- partition
- offset
- timestamp
- timestampType: This is the name of the enum i.e.
NoTimestampType,CreateTimeorLogAppendTime
The feature is backwards compatible. It's only active when --metadata_column is supplied resp. SynchronizationActionBase.withMetadataColumns is used.
For now, I've only implemented this for the KafkaDebeziumAvroDeserializationSchema and KafkaDebeziumJsonDeserializationSchema.
Tests
KafkaMetadataConverter.java Will also add more integration tests for Kafka Table and Database sync actions for various input formats.
API and Format
No changes to public apis or storage format.
The changes here are contained to the flink cdc package but I did have to update CdcSourceRecord since it previously didn't provide a way to surface arbitrary metadata for a record.
The metadata attribute on CdcSourceRecord is intentionally a generic Map so that it can potentially be used to add metadata support for other connectors like Pulsar or Mongo that are not yet implemented.
Documentation
Added the new --metadata_column parameter to Kafka CDC docs.
Dev notes
For running integration tests on MacOS with Rancher Desktop, i had to properly expose the docker socket to testcontainers e.g. system wide via sudo ln -sf "$HOME/.rd/docker.sock" /var/run/docker.sock.
Todo/WIP
- Consider making the new parsing path the default (without metadata convertors, it will just be a noop) instead of having custom constructors
- Add integration tests for all Kafka CDC formats (currently only working on itests for debezium avro and debezium json)