replicator icon indicating copy to clipboard operation
replicator copied to clipboard

RFC: Debezium integration

Open sravotto opened this issue 1 year ago • 2 comments

Integration with Debezium Connectors

Debezium is an open source distributed platform for change data capture. See https://debezium.io/ for documentation, and code.

While Debezium provides a full environment to stream events from upstream databases to other systems via Kafka connect service or a Debezium server, the various connectors can be used directly within a Java application, leveraging the debezium-api module. This module defines a small API that allows an application to easily configure and run Debezium connectors using the Debezium Engine.

cdc-sink currently directly supports several databases as sources, however, leveraging Debezium connectors could significant expand the integration points. One of the challenge is that the Debezium APIs are Java-based, while cdc-sink is written in Go. One way to get around this is to deploy a thin sidecar application that uses the debezium-api module, connects with the upstream database and forwards the events to cdc-sink in one of the output message formats supported (e.g. Json), as shown in the following diagram:

High level design

Sidecar design

The Debezium sidecar reads properties to properly configure the connector to the source database, and call the DebeziumEngine api to connect to the database and wait for events.

Sidecar responsibilities:

  • keep track of the latest event consumed from the source that was successfully accepted by the cdc-sink endpoint. The state can be stored in a file or in the cdc-sink staging database, as determined by the offset.* properties.
  • batch change events and send them to the cdc-sink server. To simplify parsing on the cdc-sink end, each batch is serialized as an array of JSON objects consisting of a key and value as defined below. The key and value objects are passed as they are received from the connector, since Debezium already provides change events in a standard format. The size of the batches is managed by the max.batch.size property.
  • apply back pressure for reading change events from the database in cases where the connector ingests messages faster than it can write them to cdc-sink. Debezium connectors manage a blocking queue that hold events streamed from the database, before they write them to cdc-sink. The size of the queue is managed by the max.queue.size property.

The handler to post batches to cdc-sink should implement the ChangeConsumer interface (see https://javadoc.io/doc/io.debezium/debezium-api/2.4.0.Final/io/debezium/engine/DebeziumEngine.ChangeConsumer.html)

The handler posts a JSON array of events with the following structure:

[
    { "key" : 
        {  
            "schema":   "... provides information about the schema for the key ...",
            "payload" :  "... JSON object that represent they key. "
        },
     "value":
       {
               "before": "... a JSON object that represents the row before the change (for updates, deletes) ...",
               "after":   "... a JSON object that represents the row after the change ..." ,
               "op":  ".... the operation (c=insert; d=delete; u=update) ...",
               "source": "...  information about the source emitting the change, e.g. database vendor, version etc. ...",
      }
  }
]

Optionally, transaction boundary events may be provided within the batch to signal begin and end of a transaction. Note: mutations associated to a transaction may extend across multiple batches. To enable transaction boundary events, set the provide.transaction.metadata property to true.

{ "key" : 
        {  
            "schema":   "... provides information about the schema for the key ..."
             "payload" : {
                "id" : "... string representation of the unique transaction identifier."
             }
        },
   "value":
       {
          "status": "... BEGIN or END",
          "id": "... string representation of the unique transaction identifier.",
          "ts_ms": "... time at the data source",
      }
}

The main sidecar code, at high level, would be:

    
    ...
    config = Configuration.empty();
    final Properties props = config.asProperties();
    try {
        props.load(new FileInputStream(propertiesFile));
    } catch (Exception e) {
        ...
    }
    ChangeConsumer<R> batcher = new Batcher(url);
    try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
            .using(props)
            .notifying((records, committer) -> 
                try {
                    // the batcher collects the change events,
                    // sends them to cdc-sink and signal the
                    // committer on success.
                    batcher.handleBatch(records,committer)
                } catch (Exception e) {
                    ....
                }
            }).build()) {
        ExecutorService executor = Executors.newSingleThreadExecutor();
        executor.execute(engine);
        while (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
            ... 
        }
    } catch (Exception e) {
       ...
    }

Example of properties to connect to a SQLServer:

# Database information
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=localhost
database.password=password
database.port=1433
database.user=sa
database.names=myDB
database.encrypt=false

name=engine
decimal.handling.mode=string
topic.prefix=cdc-connector

# Where to keep the state
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=./tmp/mysql/offsets.dat
schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
schema.history.internal.file.filename=./tmp/mysql/schemahistory.dat

# Cdc sink connection
cdcsink.url=https://localhost:30004/immediate
cdcsink.skip.verify=true

cdc-sink endpoint

On the cdc-sink server side, new endpoints will consume the change events and apply them to the target database, leveraging the usual mode of operations:

  • /immediate end point will apply the change events without respecting transaction semantics, using logical.Batcher.
  • /stage end point will stage the change events using the timestamp in the transaction boundary events as the resolved timestamp. Note: it might be possible to use database specific transaction markers (e.g. LSN for Postgres, GTID for MySQL), as they are provided as id in the transaction boundary events. This endpoint will return a success when the change events are committed into the staging area.

Alternatives considered

In integrating with the Debezium ecosystem, there are few other design alternatives that we have considered:

  • Running the standard debezium server, and leveraging one of the existing sinks to push events into cdc-sink. The best candidate is the HTTP Client sink, however, it only streams the value payload, leaving out the key. While we could infer the primary key information based on the target schema, this would be different from the other sources that we have implemented (for instance, pglogical determine the primary key based on a RelationMessage that precedes change events). In addition, each change event is sent on a separate call, limiting the ability of batching.

  • Deploying Debezium by mean of Apache Kafka Connect. While this option is the most common for customers who may already use Debezium or Kafka, it is a more complicated deployment option. In the future, we might just provide a Kafka connector for cdc-sink, but the for the short term we prefer limit the moving parts required to run cdc-sink to connect to the various sources supported by Debezium.

  • Leveraging the current CockroachDB cdc http endpoint to receive Debezium change events. While this approach would reduce the changes required in cdc-sink, it would limit how we could use the transaction boundary events that can be optionally emitted by the Debezium connectors. An ad-hoc http endpoint might also more flexible should we need to support source database specific intricacies (as an example, Postgres toasted columns), as well leveraging additional change events types (for instance, schema changes).

sravotto avatar Oct 27 '23 13:10 sravotto

I wanted to summarize the different possible approaches here, please let me know if I'm missing something but I think these do match up with what you've been proposing. I wanted to point out exactly what work would need to be completed.

  • Debezium --(HTTP)--> cdc-sink --(SQL)--> crdb
    • this requires adding the default debezium format as a source in cdc-sink
  • Debezium --(kafka)--> kafka --(kafka)--> cdc-sink --(SQL)--> crdb
    • this requires adding kafka as a source to cdc-sink -- which is a planned feature, but not yet started
    • and requires adding a debezium kafka format to transform to cdc-sink
  • Debezium as a Library --(cockroach cdc format)--> cdc-sink --(SQL)--> crdb
    • this requires adding a separate java project to run alongside the source db
    • but would require no changes to cdc-sink itself
  • Debezium as a library --(Debezium JSON)--> cdc-sink --(SQL)--> crdb
    • this requires adding a separate java project to run alongside the source db
    • and requires adding a new source types to match the output
    • this is the main example you've explored above
  • Debezium as a library --(GRPC)--> cdc-sink --(SQL)--> crdb
    • this requires adding a separate java project to run alongside the source db
    • and requires adding a new source type an implementing grpc on both sides
  • Debezium as a Library --(SQL)--> crdb
    • this is now a separate project from cdc-sink
    • but can leverage common Java SQL tooling, possibly just use the JDBC to postgres endpoint directly

BramGruneir avatar Dec 08 '23 20:12 BramGruneir

The summary is correct. Minor edit: this what I started to work on: Debezium as a library --(Debezium JSON format)--> cdc-sink --(SQL)--> crdb

I'm in the process to add some additional transformation in the sidecar that uses the library, so Debezium as a Library --(cockroach cdc format)--> cdc-sink --(SQL)--> crdb is also supported. It's fairly straightforward, and won't prevent us to extend the previous approach, should we need it.

sravotto avatar Dec 08 '23 20:12 sravotto