clickhouse-sink-connector icon indicating copy to clipboard operation
clickhouse-sink-connector copied to clipboard

Exactly-Once-Delivery semantic

Open BorisTyshkevich opened this issue 1 year ago • 11 comments

Having the ability to transfer events from MySQL/Postgres to Clickhouse with an exactly-once delivery guarantee is a great advantage in the overall DWH architecture design.

When transferring events we should not create duplicates and should not lose any data in any situation, including restarts of Source Database Server, Clickhouse Server, and Sync Connector itself. Sync Connector should write the same events as received from binlog/WAL. Reading single event stream from MySQL or Postgres is much easier than dealing with multiple Kafka Broker partitions, so we can do that in Sync Connector and get the advantage over other competing instruments.

To achieve it, we could rely on the Clickhouse block-deduplication feature. When Clickhouse receives an insert block it calculates a checksum and compares it with set in ZK. Blocks with repeated checksums are skipped. A deduplication token can be used instead of a checksum.

So in case of failure and recovery Sync Connector should always retry with the very same block as it was sent before. By default, Connector uses only 1 position pointer and can send a bigger block of rows - rows that were already sent, plus new rows that appeared in binlog after the crash. To improve it we need two position pointers and a two-stage commit:

  1. get the current position (start) from positions table (better use KeeperMap in Replicated Clustered environments)
  2. prepare to insert by advancing position but not committing it (end)
  3. write both pointers (start, end) to the positions table
  4. consume events from binlog/WAL from start to end pointers, make insert blocks for every table
  5. write insert blocks to destinations tables
  6. write end position to positions table "start" column. Set "end" column to zero or Null.
  • In a normal situation, in p1 we read only one valuable position and Null from (start, end) columns of the positions table.
  • In case of failure, we read two valuable positions and should go directly to p4, effectively making the very same insert blocks as before failure, which enables Clickhouse block deduplication.

For sharded setups, it could be more complicated due to random sharding keys and squashing small insert blocks by distributed monitor. That should be mentioned in the documentation.

Instead of default checksums calculated by Clickhouse, we could set per every insert a deduplication token, constructed as hash function from start and end position. That should be more reliable than checksumming.

BorisTyshkevich avatar Nov 06 '23 19:11 BorisTyshkevich

related to https://github.com/Altinity/clickhouse-sink-connector/issues/230

BorisTyshkevich avatar Nov 06 '23 19:11 BorisTyshkevich

@BorisTyshkevich the current system with RMT and final allows for exactly once semantics. It already works fine.

aadant avatar Nov 09 '23 04:11 aadant

RMT creates a lot of problems. I can't implement a solution for a new customer on RMT where I need to aggregate data by MV/Projection.

BorisTyshkevich avatar Nov 09 '23 07:11 BorisTyshkevich

@BorisTyshkevich the current system with RMT and final allows for exactly once semantics. It already works fine.

@aadant it doesn't work fine, because of problems with deleting.

Having a version column inside the MySQL table and exactly-once delivery allows us to explore important features of VCMT that can't be implemented over RMT:

  • deletes
  • projections over VCMT table

BorisTyshkevich avatar May 07 '24 17:05 BorisTyshkevich

Deletes are working. You need extra care to remove the data with is_deleted.

We plan to support other engines as part of this project. The issue with VCMT is that you need to cancel the previous version. How do you do that ? Cc @hodgesrm

aadant avatar May 07 '24 23:05 aadant

for a regular MT you can use this design (similar to EBay) and ClickHouse Inc. https://github.com/ClickHouse/clickhouse-kafka-connect/blob/main/docs/DESIGN.md

aadant avatar May 08 '24 04:05 aadant

The issue with VCMT is that you need to cancel the previous version. How do you do that ?

You have to maintain a special version column in a specially created MySQL table. Denormalized wide Fact table (with version column) should be built inside MySQL (by triggers) and only then synced to Clickhouse. There is no sense in syncing normalized table schema to Clickhouse.

Screenshot 2024-05-08 at 08 13 29

BorisTyshkevich avatar May 08 '24 06:05 BorisTyshkevich

Please note that the above architecture is not generic. You need an intermediate MySQL server and triggers. It is probably cleaner to publish denormalized data to Kafka and persist it with ClickHouse using Kafka connect or just the Kafka engine.

There is no sense in syncing normalized table schema to ClickHouse.

If you want to run the same MySQL queries in ClickHouse, it makes perfect sense. Please note that a lot of projects do that like MaterializedMySQL or Oracle Heatwave or AWS DMS, ...

ClickHouse performance on RMT improved dramatically in the recent releases (especially 24.3) https://github.com/ClickHouse/ClickHouse/pull/58120 https://github.com/ClickHouse/ClickHouse/pull/58218

aadant avatar May 08 '24 16:05 aadant

It is probably cleaner to publish denormalized data to Kafka and persist it with ClickHouse using Kafka connect or just the Kafka engine.

I need both old row and new row for correct VCMT processing. That is provided by Debezium on top of binlog data. That is the main reason, why I need an intermediate MySQL server in the schema.

BorisTyshkevich avatar May 08 '24 16:05 BorisTyshkevich

ClickHouse performance on RMT improved dramatically in the recent releases (expecially 24.3)

VCMT also requires FINAL processing and gets improved by the mentioned changes. The main advantage compared to RMT is the ability to use Aggregating Projections.

BorisTyshkevich avatar May 08 '24 16:05 BorisTyshkevich

@BorisTyshkevich one other route would be to support projections with RMT : https://github.com/ClickHouse/ClickHouse/issues/24778

aadant avatar May 09 '24 15:05 aadant