clickhouse-sink-connector icon indicating copy to clipboard operation
clickhouse-sink-connector copied to clipboard

Request: Support MergeTree with version timestamp.

Open subkanthi opened this issue 1 year ago • 8 comments

Hi Team, is there a way to use the lightweight altinity connector with a MergeTree engine and keep the version timestamp permanently as we want to keep the time series? We want to avoid losing the information after merging has happened on ReplacingMergeTree.

subkanthi avatar Jan 22 '24 03:01 subkanthi

That's on the roadmap. Please note that there is risk of duplicated data. Actually no data should be lost with the RMT. The history is not kept, just like in MySQL.

If the version timestamp is part of the sort key, it will not be merged. Another idea is to create a history table with a materialized view but materialized views have issues with RMT.

aadant avatar Jan 22 '24 16:01 aadant

I tested the below and it works fine if you write to the history table and use a MV to send the updates to the current table.

CREATE TABLE user_history
(
    `user_id` Int32,
    `username` String,
    `full_name` Nullable(String),
    `_version` UInt64 DEFAULT 0,
    `is_deleted` UInt8 DEFAULT 0
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY (user_id, _version)
SETTINGS index_granularity = 8192;

CREATE MATERIALIZED VIEW user_mv to user as select * from user_history;

CREATE TABLE user
(
    `user_id` Int32,
    `username` String,
    `full_name` Nullable(String),
    `_version` UInt64 DEFAULT 0,
    `is_deleted` UInt8 DEFAULT 0
)
ENGINE = ReplacingMergeTree(_version, is_deleted)
ORDER BY (user_id)
SETTINGS index_granularity = 8192 ;

aadant avatar Jan 23 '24 17:01 aadant

@aadant could you please elaborate what you've done?

I'm very interested in keeping history of the objects, and would like to implement this workaround.

vpol avatar May 14 '24 18:05 vpol

@vpol currently table overrides are not supported by the connector but you can stop it and apply, then restart the connector :


-- the sink-connector would write in the user table coming from the source

rename TABLE user to user_current;

-- add _version to the sort key 

CREATE TABLE user
(
    `user_id` Int32,
    `username` String,
    `full_name` Nullable(String),
    `_version` UInt64 DEFAULT 0,
    `is_deleted` UInt8 DEFAULT 0
)
ENGINE = ReplacingMergeTree(_version)
ORDER BY (user_id, _version)
SETTINGS index_granularity = 8192;

-- replicate changes from user (history) to user_current (latest version)
CREATE MATERIALIZED VIEW user_mv to user_current as select * from user;

aadant avatar May 15 '24 06:05 aadant

Moving to 2.3.0, need to discuss on the scope of the implementation.

subkanthi avatar May 29 '24 12:05 subkanthi

Why do we need ReplicatedMergeTree for the History table? Why don't we use plain MergeTree (or ReplicatedMergeTree) and beat duplicate with argMax? There are not too many rows in the table (not 10M), so ReplacingMT has no positive effect.

BorisTyshkevich avatar Jun 13 '24 13:06 BorisTyshkevich

Possible implementation for handling DDL's

  • stop replication.
  • Rename history table to history_#
  • Apply the DDL to table_history
  • create materialized view.
  • start replication

subkanthi avatar Jun 13 '24 16:06 subkanthi

  1. A slight change:
  • stop replication
  • Apply DDL to the main table - It could be a long process. How long connector could wait? Should we change mutation_sync mode and timeouts? Possibly, the connector should go to the restart loop.
  • Rename history table to history_# - quite fast, but in some cases with ON CLUSTER, DDL WORKER could get stuck.
  • CREATE NEW table_history
  • drop materialized view.
  • create NEW materialized view.
  • start replication

In case of failure, the connector should restart the operation from the beginning in an IDEMPOTENT way - not failing if some steps have already been processed. IF EXISTS/IF NOT EXISTS clauses should be added to all DDLs on the TABLE and COLUMN levels.

  1. The most complicated thing about _history tables is the ORDER BY/PARTITION BY. It makes no sense to make the order the same as for MySQL tables. So, the interesting discussion will be how to configure such history tables. The existing config is unsuitable for setting ORDER/PART BY for hundreds of tables.

My bet is creating a special Clickhouse table with rules for every history table with the columns as in system.tables:

  • db.table (primary key)
  • partition by
  • order by
  • primary key
  • engine
  • additional columns (String or Array)
  • suffix (_history by default)
  • dest database (the same db by default)
  • cluster name (no ON CLUSTER by default for CREATE/RENAME statements)

If the connector finds the table in the list while processing DDL update, it can do the mentioned processing. If not - do nothing, no history tables need to be created.

  1. The table itself is a KV structure, so Engines EmbeddedRocksDB (for a single server) or KeeperMap (for Replicated) will work fine here.

  2. All Clickhosue DDL statements (RENAME, CREATE TABLE, CREATE MATERIALIZED VIEW) should be cluster-ready and have been tested with the ON CLUSTER clause (if needed) and work idempotent way. We need to test all operations with falling after each step.

  3. The same approach of a special Connector CONFIG table could also be used for the MAIN table. Many clients want to configure ORDER/PART BY and ENGINE for the main tables. So we can add more columns:

  • db.table (primary key)
  • main table partition by
  • main table order by
  • main table primary key
  • main table engine
  • main table additional columns (String or Array)
  • history table partition by
  • history table order by
  • history table primary key
  • history table engine
  • history table additional columns (String or Array)
  • history suffix (_history by default)
  • history database (the same db by default)
  • cluster name (no ON CLUSTER by default for CREATE/RENAME statements)

Such a table could also filter the incoming table list instead of Debezium variables. So all table updates go to Connector; no need to change text configs when a new table is added to MySQL.

Boris.

BorisTyshkevich avatar Jun 14 '24 10:06 BorisTyshkevich