paimon icon indicating copy to clipboard operation
paimon copied to clipboard

add --metadata_column support for Kafka CDC connector

Open gmdfalk opened this issue 3 months ago • 0 comments

Purpose

Add --metadata_column support to Paimon Kafka CDC connector, similar to the already existing options added for MySQL and Postgres: https://github.com/apache/paimon/pull/2077

Supported metadata columns are those on org.apache.kafka.clients.consumer.ConsumerRecord i.e.:

  • topic
  • partition
  • offset
  • timestamp
  • timestampType: This is the name of the enum i.e. NoTimestampType, CreateTime or LogAppendTime

The feature is backwards compatible. It's only active when --metadata_column is supplied resp. SynchronizationActionBase.withMetadataColumns is used.

For now, I've only implemented this for the KafkaDebeziumAvroDeserializationSchema and KafkaDebeziumJsonDeserializationSchema.

Tests

KafkaMetadataConverter.java Will also add more integration tests for Kafka Table and Database sync actions for various input formats.

API and Format

No changes to public apis or storage format.

The changes here are contained to the flink cdc package but I did have to update CdcSourceRecord since it previously didn't provide a way to surface arbitrary metadata for a record.

The metadata attribute on CdcSourceRecord is intentionally a generic Map so that it can potentially be used to add metadata support for other connectors like Pulsar or Mongo that are not yet implemented.

Documentation

Added the new --metadata_column parameter to Kafka CDC docs.

Dev notes

For running integration tests on MacOS with Rancher Desktop, i had to properly expose the docker socket to testcontainers e.g. system wide via sudo ln -sf "$HOME/.rd/docker.sock" /var/run/docker.sock.

Todo/WIP

  • Consider making the new parsing path the default (without metadata convertors, it will just be a noop) instead of having custom constructors
  • Add integration tests for all Kafka CDC formats (currently only working on itests for debezium avro and debezium json)

gmdfalk avatar Sep 29 '25 09:09 gmdfalk