clickhouse-sink-connector
clickhouse-sink-connector copied to clipboard
Aggregate-friendly tables with sign=-1 column
ReplacingMergeTree in its classical usage is not the best instrument for storing binlog events. We can't attach Materialized View to RMT to produce a table with aggregated data due to UPDATES and deletes in source MySQL table. It needs first to subtract the old value from the sum aggregate and only then add a new value.
CollapsingMergeTree (and VersionedCollapsingMergeTree) was invented in Clickhouse to solve exactly that scenario.
For the UPDATE event, MySQL binlog contains both old and new values, so it's quite easy to insert both rows to the Clickhouse table with sign = -1 and 1.
For the DELETE event we also have old values and can insert them with sign = -1. In such case, CollapsingMergeTree will delete both rows (sing = 1 and -1) with the same id during SELECT ... FINAL processing.
Such processing requires exactly-once semantic delivery - https://github.com/Altinity/clickhouse-sink-connector/issues/359 . With currently implemented at-least-once delivery duplicates will crash all the collapse logic.
There are no version attribute in binlog, only position. Version is needed for VCMT Engine to make rows collapsing. So, a MySQL table should contain DateTime64 column "_updated_at" (filled by trigger) that can be used as _version for VCMT (it will be received as a normal column in UPDATE and DELETE events).
ReplacingMergeTree also can have a sign column with 1/-1 values and work the same way removing all rows except the last one. It is already implemented for DELELE events without Connector setting replacingmergetree.delete.column: "is_deleted"
See Robert's slides about RMT with sign - https://www.youtube.com/watch?v=G9MxRpKlbnI&t=547s
Example:
create table VCMT (
id UInt64,
value UInt64,
updated_at DateTime materialized now(),
_version UInt64,
_sign Int8
) ENGINE = VersionedCollapsingMergeTree(_sign, _version)
ORDER BY id;
create table SMT (
day Date,
id UInt64,
value UInt64
) engine = SummingMergeTree
order by (day,id);
create materialized view __SMT to SMT as
select day, id, sum(_sign*value) as value
from VCMT;
See details about VCMT - https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/versionedcollapsingmergetree#table_engines_versionedcollapsingmergetree
@BorisTyshkevich RMT should be the default and we plan to enable overrides for the engine.
CollapsingMergeTree (and VersionedCollapsingMergeTree) was invented in Clickhouse to solve exactly that scenario.
we can not use those because we need to return the same version and the version is always increasing.
If you want to use projections and MVs, you can use a regular MergeTree (or even ReplicatingMergeTree), we should support it. https://github.com/Altinity/clickhouse-sink-connector/issues/330
cc @hodgesrm
we can not use those because we need to return the same version and the version is always increasing.
There are no version in binlog, only position.
We need a MySQL table with DateTime64 column _updated_at that can be used as _version for VCMT (it will be received as a normal column in UPDATE and DELETE events).
(added to the description above)
If you want to use projections and MVs, you can use a regular MergeTree
I need a Null Engine for that, as for regular MergeTree I have to regularly delete all that data from the disk and waste server resources. A separate issue was created for that - see https://github.com/Altinity/clickhouse-sink-connector/issues/360
As Sinc Connector ingests data in a single thread we could use the deduplication-in-insert like technique to make a workaround that will work even without exactly-once and addition updated_at column in source table:
create table X ( id UInt64, data String, _sign Int8) engine = Null;
create table Y ( id UInt64, data String, _sign Int8, _version DateTime) engine = VersionedCollapsingMergeTree(_sign, _version);
create materialized view _Y to Y as
with new as (select *, now() as _version from X),
old AS (
SELECT * EXCEPT _sign, arrayJoin([-1,1]) AS sign
FROM Y FINAL
PREWHERE id IN ( SELECT id FROM X )
)
select id,
if(sign = -1, old.data, new.data) AS data,
if(sign = -1, old._version, new._version) AS _version,
if(sign = -1, -1, 1) AS _sign
from new left join old using id
where not (X._sign = -1 and _sign = 1 ) -- for DELETEs
That will work fine on not-so-big ingest volumes. But for binlog processing, it's mostly OK.