connect
connect copied to clipboard
Implement CDC for MySQL using binlog
MySQL is a very common reliable RDS. However listening to events as CDC (change data capture) from MySQL has still rooms to grow as there are not many solutions readily available.
In order to create an add-on for benthos, we need to understand how to handle the different events from MySQL:
- CREATE/INSERT
- UPDATE
- DELETE
To use binlog from MySQL, it has to be enabled on the server side. Once enabled, the mysqlbinlog
can read the binlog files which looks as follows:
# mysqlbinlog -vvv /var/lib/mysql/master.000001
BINLOG '
JxiqVxMBAAAALAAAAI7LegAAAHQAAAAAAAEABHRlc3QAAWEAAQMAAUTAFAY=
JxiqVx4BAAAAKAAAALbLegAAAHQAAAAAAAEAAgAB//5kAAAAedRLHg==
'/*!*/;
### INSERT INTO `test`.`a`
### SET
### @1=100 /* INT meta=0 nullable=1 is_null=0 */
# at 8047542
#160809 17:51:35 server id 1 end_log_pos 8047573 CRC32 0x56b36ca5 Xid = 24453
COMMIT/*!*/;
Which is basically the historical logs of queries that can be executed on a new database to restore the state.
Idea:
With the binlog file, we have several options to move forward
- create a parser to parse the binlog file output it in a way that benthos can use easily
- use one of the existing parsers (e.g. a binlog-json parser) and work from there
However regardless of the solution we choose, we will still need to come up with an output that will handle the 3 CDC events.
If we take the common use case of mysql-kafka, there are only 2 events:
- CREATE/UPDATE - key + data
- DELETE - only key (data = null)
So we should be able to know at all time within a benthos pipeline what the key
of the current message/data is.
Maybe it would be good to have a benthos generated id/key for each message in the pipeline (and this should be for all the benthos input available). If the input support id/key like mysql, kafka or any other inputs, we can override the benthos id/key with it. This will make sure that all the messages have the same
meta pk
in benthos terms.
The reason for this is that if we don't have it, we need to handle it separately. Example CSV to kafka benthos:
input:
gcp_cloud_storage:
bucket: ${GCS_BUCKET}
prefix: ${GCS_PREFIX}
codec: csv
delete_objects: true
pipeline:
processors:
- bloblang: |
meta pk = this.person_id
root.person_id = this.person_id
root.name = this.name
root.address = this.address
# Check if we need to send tombstone of the is_deletion flag.
# To send tombstone, the root should be set to null
root = if this.is_deletion == "true" {
null
}
output:
kafka:
addresses: [ ${KAFKA_BROKER_ADDRESS} ]
topic: ${KAFKA_TOPIC}
client_id: ${KAFKA_CLIENT_ID}
target_version: 2.4.0
key: ${! meta("pk") }
max_in_flight: 10
batching:
count: 1000
period: "60s"
In this example you can see that i had to handle the key
and tombstone in the bloblang. However if we have always 2 object to work with (i.e. 1 for key and 1 for value) this can be much simpler. (of course there is no way to determine a delete from a csv file, this is purely based on the type of input that actually supports delete)
The MySQL Binlog perspective
The new input add-on should handle the 3 events mentioned above. Let's assume we can have a key
and value
of the message in each step of the pipeline of a benthos app. The add-on could process the events as follows (taking a basic person object with basic fields as example):
-
INSERT INTO PERSON (person_id, name, address) VALUES (100, "Tom", "Some Street")
# key object
{
"person_id": 100
}
# value object
{
"person_id": 100,
"name": "Tom,
"address": "Some Street"
}
Example of a benthos pipeline:
pipeline:
processors:
- bloblang: |
# here we can play with key and value instead of root
# by default it would be something like (what we receive from the binlog):
key.person_id = 100
value.person_id = 100
value.name = "Tom"
value.address = "Some Street"
# And if we want to process the values, we can use bloblang as we did already
# renaming the key field for example:
key.person_id = deleted()
key.id = this.person_id
-
UPDATE
- any update could be treated the same way as theINSERT
above. However another base attribute can be added for the old values. so we would then havekey
,value
andold_value
. -
DELETE
- for deletes, we would still havekey
andvalue
. However the value will be set tonull
. This can be challenged in a way that we provide an object with all attributes set to null, but i highly doubt this would make much sense.
Examples of binlog json parsers:
- https://github.com/labbati/mysqlbinlog-to-json
- https://github.com/noplay/python-mysql-replication
- https://github.com/zendesk/maxwell
I'll add more updates later. If. you have more ideas or questions, feel free to add.