connect icon indicating copy to clipboard operation
connect copied to clipboard

Implement CDC for MySQL using binlog

Open Locustv2 opened this issue 3 years ago • 3 comments

MySQL is a very common reliable RDS. However listening to events as CDC (change data capture) from MySQL has still rooms to grow as there are not many solutions readily available.

In order to create an add-on for benthos, we need to understand how to handle the different events from MySQL:

  • CREATE/INSERT
  • UPDATE
  • DELETE

To use binlog from MySQL, it has to be enabled on the server side. Once enabled, the mysqlbinlog can read the binlog files which looks as follows:

# mysqlbinlog -vvv /var/lib/mysql/master.000001 

BINLOG '
JxiqVxMBAAAALAAAAI7LegAAAHQAAAAAAAEABHRlc3QAAWEAAQMAAUTAFAY=
JxiqVx4BAAAAKAAAALbLegAAAHQAAAAAAAEAAgAB//5kAAAAedRLHg==
'/*!*/;
### INSERT INTO `test`.`a`
### SET
###   @1=100 /* INT meta=0 nullable=1 is_null=0 */
# at 8047542
#160809 17:51:35 server id 1  end_log_pos 8047573 CRC32 0x56b36ca5      Xid = 24453
COMMIT/*!*/;

Which is basically the historical logs of queries that can be executed on a new database to restore the state.

Idea:

With the binlog file, we have several options to move forward

  • create a parser to parse the binlog file output it in a way that benthos can use easily
  • use one of the existing parsers (e.g. a binlog-json parser) and work from there

However regardless of the solution we choose, we will still need to come up with an output that will handle the 3 CDC events.

If we take the common use case of mysql-kafka, there are only 2 events:

  • CREATE/UPDATE - key + data
  • DELETE - only key (data = null)

So we should be able to know at all time within a benthos pipeline what the key of the current message/data is.

Maybe it would be good to have a benthos generated id/key for each message in the pipeline (and this should be for all the benthos input available). If the input support id/key like mysql, kafka or any other inputs, we can override the benthos id/key with it. This will make sure that all the messages have the same meta pk in benthos terms.

The reason for this is that if we don't have it, we need to handle it separately. Example CSV to kafka benthos:

input:
  gcp_cloud_storage:
    bucket: ${GCS_BUCKET}
    prefix: ${GCS_PREFIX}
    codec: csv
    delete_objects: true

pipeline:
  processors:
    - bloblang: |
        meta pk = this.person_id
        root.person_id = this.person_id
        root.name = this.name
        root.address  = this.address
        # Check if we need to send tombstone of the is_deletion flag.
        # To send tombstone, the root should be set to null
        root = if this.is_deletion == "true" {
            null
        }

output:
  kafka:
    addresses: [ ${KAFKA_BROKER_ADDRESS} ]
    topic: ${KAFKA_TOPIC}
    client_id: ${KAFKA_CLIENT_ID}
    target_version: 2.4.0
    key: ${! meta("pk") }
    max_in_flight: 10
    batching:
      count: 1000
      period: "60s"

In this example you can see that i had to handle the key and tombstone in the bloblang. However if we have always 2 object to work with (i.e. 1 for key and 1 for value) this can be much simpler. (of course there is no way to determine a delete from a csv file, this is purely based on the type of input that actually supports delete)

The MySQL Binlog perspective

The new input add-on should handle the 3 events mentioned above. Let's assume we can have a key and value of the message in each step of the pipeline of a benthos app. The add-on could process the events as follows (taking a basic person object with basic fields as example):

  • INSERT INTO PERSON (person_id, name, address) VALUES (100, "Tom", "Some Street")
# key object
{
  "person_id": 100
}

# value object
{
  "person_id": 100,
  "name": "Tom,
  "address": "Some Street"
}

Example of a benthos pipeline:

pipeline:
  processors:
    - bloblang: |
        # here we can play with key and value instead of root
        # by default it would be something like (what we receive from the binlog):
        key.person_id = 100
        value.person_id = 100
        value.name = "Tom"
        value.address = "Some Street"

       # And if we want to process the values, we can use bloblang as we did already
       # renaming the key field for example:
       key.person_id = deleted()
       key.id = this.person_id
  • UPDATE - any update could be treated the same way as the INSERT above. However another base attribute can be added for the old values. so we would then have key, value and old_value.
  • DELETE - for deletes, we would still have key and value. However the value will be set to null. This can be challenged in a way that we provide an object with all attributes set to null, but i highly doubt this would make much sense.

Examples of binlog json parsers:

  • https://github.com/labbati/mysqlbinlog-to-json
  • https://github.com/noplay/python-mysql-replication
  • https://github.com/zendesk/maxwell

I'll add more updates later. If. you have more ideas or questions, feel free to add.

Locustv2 avatar Dec 02 '21 10:12 Locustv2