clickhouse-sink-connector
clickhouse-sink-connector copied to clipboard
Multiple threads processing
I think, that the current code doesn't work properly. I have checked it by logs. It runs ClickhouseBatchRunnable sequently, instead of parallel. We should create a few ClickhouseBatchRunnable tasks by scheduleAtFixedRate method to make them work in parallel. Such behavior for single task is described in docs for ScheduledThreadPoolExecutor for method scheduleAtFixedRate.
It is likely that this code will cause race conditions (e.g. records inserted/updated/removed in an order that is different from binlog), you will have to implement per topic locks to avoid that (and to avoid your issue here https://github.com/Altinity/clickhouse-sink-connector/issues/350).
I've actually tried to implement a similar lock (though it wasn't needed it seems), in ClickHouseBatchRunnable
:
final private ConcurrentHashMap<String, Lock> topicLocks; //should be passed in constructor and shared
...
for (Map.Entry<String, Queue<ClickHouseStruct>> entry : this.records.entrySet()) {
String topicName = entry.getKey();
Queue<ClickHouseStruct> queue = entry.getValue();
if (!queue.isEmpty()) {
if (!this.topicLocks.containsKey(topicName)) {
Lock lock = new ReentrantLock();
this.topicLocks.putIfAbsent(topicName, lock);
}
Lock lock = this.topicLocks.get(topicName);
boolean isLockAcquired = lock.tryLock();
if (isLockAcquired) {
try {
processRecordsByTopic(entry.getKey(), queue);
} finally {
lock.unlock();
}
}
}
}
@AdamKatzDev I think the order of insertions is not important. There is a version column that helps Clickhouse determine the order in which rows are changed. Unfortunately, locks won't make the queue processing faster
@IlyaTsoi I couldn't figure out the moment when the version column is generated, version correctness itself might depend on the order records are processed. There is also an issue caused by records coming before or after a DDL (failures injecting records with different schema, lost or corrupted data after truncates).
@aadant @subkanthi could you please tell at what moment row version generated and how?
@AdamKatzDev If I'm not mistaken, this value is taken from the debezium message field ts_ms. From debezium docs for mysql connector:
In the source object, ts_ms indicates the time that the change was made in the database. By comparing the value for payload.source.ts_ms with the value for payload.ts_ms, you can determine the lag between the source database update and Debezium.
You are right about DDL handling. I haven't explored the current logic yet)
@IlyaTsoi if you are correct then this version control won't work for very hot data.
https://clickhouse.com/docs/en/engines/table-engines/mergetree-family/replacingmergetree
If two inserted rows have the same version number, the last inserted row is the one kept.
Chances of the same row updating in the same millisecond can be quite high depending on the load.
@IlyaTsoi @AdamKatzDev for the MySQL case the version is increasing as it contains a snowflakeID + a counter coming from the GTID. You would need to get the equivalent for SQL server or postgres
Regarding the DDL, all table writers are flushed before the DDL is applied with the sink-connector-lightweight. @subkanthi can confirm.
If you have a test case that is failing or losing data, please report it as a separate issue ideally with a test case. Thanks !
@aadant
@IlyaTsoi @AdamKatzDev for the MySQL case the version is increasing as it contains a snowflakeID + a counter coming from the GTID.
So it looks like it is safe to insert data in an arbitrary order. If there are same millisecond collisions then GTID should take care of this.
Regarding the DDL, all table writers are flushed before the DDL is applied with the sink-connector-lightweight. @subkanthi can confirm.
I remember that code. Looks safe.
@IlyaTsoi looks like DDL operations are not a problem after all. MySQL replication is not an issue too since it is safe to insert rows in any order as you assumed. The only issue is other databases as @aadant mentioned, their version value is just a millisecond timestamp. https://github.com/Altinity/clickhouse-sink-connector/blob/5883bf419169fc3663c4bdb7afcb226ceb4045ff/sink-connector/src/main/java/com/altinity/clickhouse/sink/connector/db/DbWriter.java#L640-L649
It looks like there is an equivalent for GTID in both PostgreSQL and SQL Sever called LSN and Commit LSN respectively that could be used instead. At worst case scenario this multi-threading mechanism should only be enabled for MySQL.
It looks like there is an equivalent for GTID in both PostgreSQL and SQL Sever called LSN and Commit LSN respectively that could be used instead.
Hm, it should be checked. Would be nice if it worked)