replicator
replicator copied to clipboard
RFC: Debezium integration
Integration with Debezium Connectors
Debezium is an open source distributed platform for change data capture. See https://debezium.io/ for documentation, and code.
While Debezium provides a full environment to stream events from upstream databases to other systems via Kafka connect service or a Debezium server, the various connectors can be used directly within a Java application, leveraging the debezium-api
module. This module defines a small API that allows an application to easily configure and run Debezium connectors using the Debezium Engine.
cdc-sink
currently directly supports several databases as sources, however, leveraging Debezium connectors could significant expand the integration points. One of the challenge is that the Debezium APIs are Java-based, while cdc-sink
is written in Go. One way to get around this is to deploy a thin sidecar application that uses the debezium-api
module, connects with the upstream database and forwards the events to cdc-sink
in one of the output message formats supported (e.g. Json), as shown in the following diagram:
Sidecar design
The Debezium sidecar reads properties to properly configure the connector to the source database, and call the DebeziumEngine
api to connect to the database and wait for events.
Sidecar responsibilities:
- keep track of the latest event consumed from the source that was successfully accepted by the
cdc-sink
endpoint. The state can be stored in a file or in thecdc-sink
staging database, as determined by theoffset.*
properties. - batch change events and send them to the
cdc-sink
server. To simplify parsing on thecdc-sink
end, each batch is serialized as an array of JSON objects consisting of a key and value as defined below. The key and value objects are passed as they are received from the connector, since Debezium already provides change events in a standard format. The size of the batches is managed by themax.batch.size
property. - apply back pressure for reading change events from the database in cases where the connector ingests messages faster than it can write them to
cdc-sink
. Debezium connectors manage a blocking queue that hold events streamed from the database, before they write them tocdc-sink
. The size of the queue is managed by themax.queue.size
property.
The handler to post batches to cdc-sink should implement the ChangeConsumer
interface (see https://javadoc.io/doc/io.debezium/debezium-api/2.4.0.Final/io/debezium/engine/DebeziumEngine.ChangeConsumer.html)
The handler posts a JSON array of events with the following structure:
[
{ "key" :
{
"schema": "... provides information about the schema for the key ...",
"payload" : "... JSON object that represent they key. "
},
"value":
{
"before": "... a JSON object that represents the row before the change (for updates, deletes) ...",
"after": "... a JSON object that represents the row after the change ..." ,
"op": ".... the operation (c=insert; d=delete; u=update) ...",
"source": "... information about the source emitting the change, e.g. database vendor, version etc. ...",
}
}
]
Optionally, transaction boundary events may be provided within the batch to signal begin and end of a transaction. Note: mutations associated to a transaction may extend across multiple batches. To enable transaction boundary events, set the provide.transaction.metadata
property to true.
{ "key" :
{
"schema": "... provides information about the schema for the key ..."
"payload" : {
"id" : "... string representation of the unique transaction identifier."
}
},
"value":
{
"status": "... BEGIN or END",
"id": "... string representation of the unique transaction identifier.",
"ts_ms": "... time at the data source",
}
}
The main sidecar code, at high level, would be:
...
config = Configuration.empty();
final Properties props = config.asProperties();
try {
props.load(new FileInputStream(propertiesFile));
} catch (Exception e) {
...
}
ChangeConsumer<R> batcher = new Batcher(url);
try (DebeziumEngine<ChangeEvent<String, String>> engine = DebeziumEngine.create(Json.class)
.using(props)
.notifying((records, committer) ->
try {
// the batcher collects the change events,
// sends them to cdc-sink and signal the
// committer on success.
batcher.handleBatch(records,committer)
} catch (Exception e) {
....
}
}).build()) {
ExecutorService executor = Executors.newSingleThreadExecutor();
executor.execute(engine);
while (!executor.awaitTermination(60, TimeUnit.SECONDS)) {
...
}
} catch (Exception e) {
...
}
Example of properties to connect to a SQLServer:
# Database information
connector.class=io.debezium.connector.sqlserver.SqlServerConnector
database.hostname=localhost
database.password=password
database.port=1433
database.user=sa
database.names=myDB
database.encrypt=false
name=engine
decimal.handling.mode=string
topic.prefix=cdc-connector
# Where to keep the state
offset.storage=org.apache.kafka.connect.storage.FileOffsetBackingStore
offset.storage.file.filename=./tmp/mysql/offsets.dat
schema.history.internal=io.debezium.storage.file.history.FileSchemaHistory
schema.history.internal.file.filename=./tmp/mysql/schemahistory.dat
# Cdc sink connection
cdcsink.url=https://localhost:30004/immediate
cdcsink.skip.verify=true
cdc-sink endpoint
On the cdc-sink
server side, new endpoints will consume the change events and apply them to the target database, leveraging the usual mode of operations:
-
/immediate
end point will apply the change events without respecting transaction semantics, usinglogical.Batcher
. -
/stage
end point will stage the change events using the timestamp in the transaction boundary events as the resolved timestamp. Note: it might be possible to use database specific transaction markers (e.g. LSN for Postgres, GTID for MySQL), as they are provided asid
in the transaction boundary events. This endpoint will return a success when the change events are committed into the staging area.
Alternatives considered
In integrating with the Debezium ecosystem, there are few other design alternatives that we have considered:
-
Running the standard debezium server, and leveraging one of the existing sinks to push events into cdc-sink. The best candidate is the HTTP Client sink, however, it only streams the value payload, leaving out the key. While we could infer the primary key information based on the target schema, this would be different from the other sources that we have implemented (for instance, pglogical determine the primary key based on a
RelationMessage
that precedes change events). In addition, each change event is sent on a separate call, limiting the ability of batching. -
Deploying Debezium by mean of Apache Kafka Connect. While this option is the most common for customers who may already use Debezium or Kafka, it is a more complicated deployment option. In the future, we might just provide a Kafka connector for cdc-sink, but the for the short term we prefer limit the moving parts required to run cdc-sink to connect to the various sources supported by Debezium.
-
Leveraging the current CockroachDB cdc http endpoint to receive Debezium change events. While this approach would reduce the changes required in cdc-sink, it would limit how we could use the transaction boundary events that can be optionally emitted by the Debezium connectors. An ad-hoc http endpoint might also more flexible should we need to support source database specific intricacies (as an example, Postgres toasted columns), as well leveraging additional change events types (for instance, schema changes).
I wanted to summarize the different possible approaches here, please let me know if I'm missing something but I think these do match up with what you've been proposing. I wanted to point out exactly what work would need to be completed.
- Debezium --(HTTP)--> cdc-sink --(SQL)--> crdb
- this requires adding the default debezium format as a source in cdc-sink
- Debezium --(kafka)--> kafka --(kafka)--> cdc-sink --(SQL)--> crdb
- this requires adding kafka as a source to cdc-sink -- which is a planned feature, but not yet started
- and requires adding a debezium kafka format to transform to cdc-sink
- Debezium as a Library --(cockroach cdc format)--> cdc-sink --(SQL)--> crdb
- this requires adding a separate java project to run alongside the source db
- but would require no changes to cdc-sink itself
- Debezium as a library --(Debezium JSON)--> cdc-sink --(SQL)--> crdb
- this requires adding a separate java project to run alongside the source db
- and requires adding a new source types to match the output
- this is the main example you've explored above
- Debezium as a library --(GRPC)--> cdc-sink --(SQL)--> crdb
- this requires adding a separate java project to run alongside the source db
- and requires adding a new source type an implementing grpc on both sides
- Debezium as a Library --(SQL)--> crdb
- this is now a separate project from cdc-sink
- but can leverage common Java SQL tooling, possibly just use the JDBC to postgres endpoint directly
The summary is correct.
Minor edit: this what I started to work on:
Debezium as a library --(Debezium JSON format)--> cdc-sink --(SQL)--> crdb
I'm in the process to add some additional transformation in the sidecar that uses the library, so Debezium as a Library --(cockroach cdc format)--> cdc-sink --(SQL)--> crdb
is also supported. It's fairly straightforward, and won't prevent us to extend the previous approach, should we need it.