clickhouse-sink-connector icon indicating copy to clipboard operation
clickhouse-sink-connector copied to clipboard

Aggregate-friendly tables with sign=-1 column

Open BorisTyshkevich opened this issue 2 years ago • 3 comments

ReplacingMergeTree in its classical usage is not the best instrument for storing binlog events. We can't attach Materialized View to RMT to produce a table with aggregated data due to UPDATES and deletes in source MySQL table. It needs first to subtract the old value from the sum aggregate and only then add a new value.

CollapsingMergeTree (and VersionedCollapsingMergeTree) was invented in Clickhouse to solve exactly that scenario.

For the UPDATE event, MySQL binlog contains both old and new values, so it's quite easy to insert both rows to the Clickhouse table with sign = -1 and 1.

For the DELETE event we also have old values and can insert them with sign = -1. In such case, CollapsingMergeTree will delete both rows (sing = 1 and -1) with the same id during SELECT ... FINAL processing.

Such processing requires exactly-once semantic delivery - https://github.com/Altinity/clickhouse-sink-connector/issues/359 . With currently implemented at-least-once delivery duplicates will crash all the collapse logic.

There are no version attribute in binlog, only position. Version is needed for VCMT Engine to make rows collapsing. So, a MySQL table should contain DateTime64 column "_updated_at" (filled by trigger) that can be used as _version for VCMT (it will be received as a normal column in UPDATE and DELETE events).

ReplacingMergeTree also can have a sign column with 1/-1 values and work the same way removing all rows except the last one. It is already implemented for DELELE events without Connector setting replacingmergetree.delete.column: "is_deleted"

See Robert's slides about RMT with sign - https://www.youtube.com/watch?v=G9MxRpKlbnI&t=547s

Example:

create table VCMT (
     id                   UInt64, 
     value             UInt64,
     updated_at   DateTime materialized now(),
     _version        UInt64,
     _sign              Int8
) ENGINE = VersionedCollapsingMergeTree(_sign, _version)
ORDER BY id;

create table SMT (
   day    Date,
   id       UInt64,
   value UInt64
) engine = SummingMergeTree
order by (day,id);

create materialized view __SMT to SMT as
select day, id, sum(_sign*value) as value
from VCMT;

See details about VCMT - https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/versionedcollapsingmergetree#table_engines_versionedcollapsingmergetree

BorisTyshkevich avatar Nov 08 '23 09:11 BorisTyshkevich

@BorisTyshkevich RMT should be the default and we plan to enable overrides for the engine.

CollapsingMergeTree (and VersionedCollapsingMergeTree) was invented in Clickhouse to solve exactly that scenario.

we can not use those because we need to return the same version and the version is always increasing.

If you want to use projections and MVs, you can use a regular MergeTree (or even ReplicatingMergeTree), we should support it. https://github.com/Altinity/clickhouse-sink-connector/issues/330

cc @hodgesrm

aadant avatar Nov 09 '23 04:11 aadant

we can not use those because we need to return the same version and the version is always increasing.

There are no version in binlog, only position.

We need a MySQL table with DateTime64 column _updated_at that can be used as _version for VCMT (it will be received as a normal column in UPDATE and DELETE events).

(added to the description above)

BorisTyshkevich avatar Nov 09 '23 05:11 BorisTyshkevich

If you want to use projections and MVs, you can use a regular MergeTree

I need a Null Engine for that, as for regular MergeTree I have to regularly delete all that data from the disk and waste server resources. A separate issue was created for that - see https://github.com/Altinity/clickhouse-sink-connector/issues/360

As Sinc Connector ingests data in a single thread we could use the deduplication-in-insert like technique to make a workaround that will work even without exactly-once and addition updated_at column in source table:

create table X ( id UInt64, data String, _sign Int8) engine = Null;
create table Y ( id UInt64, data String, _sign Int8, _version DateTime) engine =  VersionedCollapsingMergeTree(_sign, _version);

create materialized view _Y to Y as
with new as (select *, now() as _version from X),
     old AS (
        SELECT * EXCEPT _sign, arrayJoin([-1,1]) AS sign
        FROM Y FINAL
        PREWHERE id IN ( SELECT id FROM X )
     )
select id,
     if(sign = -1, old.data, new.data) AS data,
     if(sign = -1, old._version, new._version) AS _version,
     if(sign = -1, -1, 1) AS _sign
from new left join old using id
where not (X._sign = -1 and _sign = 1 ) -- for DELETEs

That will work fine on not-so-big ingest volumes. But for binlog processing, it's mostly OK.

BorisTyshkevich avatar Nov 09 '23 06:11 BorisTyshkevich