clickhouse-sink-connector
clickhouse-sink-connector copied to clipboard
[sink-connector-lightweight] Initial snapshot slows down and then OOM
Good day.
When creating initial snapshot everything starts fast but then slows down considerably. It manages to export a couple of million of rows no problem and then this happens:
74164 2023-10-05 17:52:41.416 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 347670 of 2867292 records for table 'schema.table' after 00:00:10.009
84165 2023-10-05 17:52:51.417 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 628341 of 2867292 records for table 'schema.table' after 00:00:20.01
96286 2023-10-05 17:53:03.538 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 843087 of 2867292 records for table 'schema.table' after 00:00:32.131
96297 2023-10-05 17:53:03.549 [pool-271-thread-1] INFO com.altinity.clickhouse.sink.connector.db.DbWriter - *** QUERY***insert into table(columns)
106292 2023-10-05 17:53:13.544 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1064731 of 2867292 records for table 'schema.table' after 00:00:42.137
116617 2023-10-05 17:53:23.869 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1109216 of 2867292 records for table 'schema.table' after 00:00:52.462
127343 2023-10-05 17:53:34.595 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1156627 of 2867292 records for table 'schema.table' after 00:01:03.188
138743 2023-10-05 17:53:45.995 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1162867 of 2867292 records for table 'schema.table' after 00:01:14.588
150097 2023-10-05 17:53:57.349 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1167905 of 2867292 records for table 'schema.table' after 00:01:25.942
161378 2023-10-05 17:54:08.630 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1169915 of 2867292 records for table 'schema.table' after 00:01:37.223
164166 2023-10-05 17:54:11.418 [main] INFO io.debezium.connector.common.BaseSourceTask - 749911 records sent during previous 00:01:27.143, last recorded offset of {server=embeddedconnector} partition is {ts_sec=1696528351, file=mysql-bin-changelog.000517, pos=157, snapshot=true}
181053 2023-10-05 17:54:28.305 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1171289 of 2867292 records for table 'schema.table' after 00:01:56.898
192186 2023-10-05 17:54:39.438 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1171670 of 2867292 records for table 'schema.table' after 00:02:08.031
222901 2023-10-05 17:55:10.153 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1172039 of 2867292 records for table 'schema.table' after 00:02:38.746
234068 2023-10-05 17:55:21.320 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1172245 of 2867292 records for table 'schema.table' after 00:02:49.913
245211 2023-10-05 17:55:32.463 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1172488 of 2867292 records for table 'schema.table' after 00:03:01.056
256355 2023-10-05 17:55:43.607 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1172644 of 2867292 records for table 'schema.table' after 00:03:12.2
267478 2023-10-05 17:55:54.730 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1172871 of 2867292 records for table 'schema.table' after 00:03:23.323
279701 2023-10-05 17:56:06.953 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1173115 of 2867292 records for table 'schema.table' after 00:03:35.546
293680 2023-10-05 17:56:20.932 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1173426 of 2867292 records for table 'schema.table' after 00:03:49.525
304822 2023-10-05 17:56:32.074 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1173970 of 2867292 records for table 'schema.table' after 00:04:00.667
315965 2023-10-05 17:56:43.217 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1174096 of 2867292 records for table 'schema.table' after 00:04:11.81
327084 2023-10-05 17:56:54.336 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1174406 of 2867292 records for table 'schema.table' after 00:04:22.929
338216 2023-10-05 17:57:05.468 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1174680 of 2867292 records for table 'schema.table' after 00:04:34.061
349328 2023-10-05 17:57:16.580 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1174997 of 2867292 records for table 'schema.table' after 00:04:45.173
366045 2023-10-05 17:57:33.297 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1175345 of 2867292 records for table 'schema.table' after 00:05:01.89
380959 2023-10-05 17:57:48.211 [debezium-mysqlconnector-embeddedconnector-change-event-source-coordinator] INFO io.debezium.relational.RelationalSnapshotChangeEventSource - Exported 1175675 of 2867292 records for table 'schema.table' after 00:05:16.804`
Exception: java.lang.OutOfMemoryError thrown from the UncaughtExceptionHandler in thread "HTTP-Dispatcher"
After that snapshot is restarted the connector starts to tank even earlier, after ~200K rows.
Both MySQL and Clickhouse instances have negligible load, but the connector instance has 100% CPU load and consumes full RAM constantly even when pushing tens of rows per second.
What I tried:
- I've tried to use
-Xmx6144M
and-Xmx10240M
on a 16GB instance without any noticeable effect (i.e. both of them tank and OOM) - Tried to tweak parameters like
snapshot.max.threads, offset.flush.interval.ms, offset.flush.timeout.ms, max.request.size, max.queue.size, max.batch.size
with no effect (not even sure if all of them work for this jar file) - Tried to change offset storage to
org.apache.kafka.connect.storage.FileOffsetBackingStore
, no effect (except removingthe transactions not supported
log). I've also seenTimed out waiting to flush EmbeddedEngine{id="engine"} offsets to storage
even with file storage, which should not happen normally I think. - Tried to swtich to 0.6, same
- Java 11, Java 17 - same
Thanks in advance.
My guess is that the Xmx
settings did not get applied. you can do a ps -ef
in the docker container and check if its applied to the java process.
Here is an example of setting it in the docker container.
debezium-embedded:
image: registry.gitlab.com/altinity-public/container-images/clickhouse_debezium_embedded:${SINK_LIGHTWEIGHT_VERSION}
# build:
# context: ../
restart: "no"
ports:
- "8083:8083"
- "5005:5005"
- "7000:7000"
depends_on:
- clickhouse
env_file:
- docker_postgres.env
extra_hosts:
- "host.docker.internal:host-gateway"
environment:
JAVA_OPTS: >
-Xmx5G
-Xms128m
volumes:
#- ./data:/data
- ./config_postgres.yml:/config.yml
In order to initialize large databases, you may need to take a snapshot from MySQL and restore it to CH. This project has all the tooling to do it
Thank you for swift responses, folks.
@subkanthi
My guess is that the Xmx settings did not get applied. you can do a ps -ef in the docker container and check if its applied to the java process.
It 100% applies, I've verified it using htop
, after applying -Xmx10240M
RAM consumption increased to, yeah, about 10G. The command is the following:
java -Xmx10240M -jar clickhouse-debezium-embedded-1.0-SNAPSHOT.jar config.yaml
I've also tried to spin the docker image. Looks like it does not respect JAVA_OPTS
env. I had to override entrypoint to this:
java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 $JAVA_OPTS -jar /app.jar /config.yml com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication
The result is the same, import slows down to low values, then OOMs.
@aadant
This project has all the tooling to do it
Yeah, I saw mentions of Clickhouse Loader
, but the documentation link on the main page leads to 404 (https://github.com/Altinity/clickhouse-sink-connector/blob/develop/python/README.md). Could you please lead me to the tool?
In order to initialize large databases
The logical dump I am trying to create snapshot from weights 29GB uncompressed. The largest table is 14GB, but the snapshot process stalls after processing ~5GB (two tables 1.7GB and 3GB respectively). Is this really the limit?
Thank you for swift responses, folks.
@subkanthi
My guess is that the Xmx settings did not get applied. you can do a ps -ef in the docker container and check if its applied to the java process.
It 100% applies, I've verified it using
htop
, after applying-Xmx10240M
RAM consumption increased to, yeah, about 10G. The command is the following:java -Xmx10240M -jar clickhouse-debezium-embedded-1.0-SNAPSHOT.jar config.yaml
I've also tried to spin the docker image. Looks like it does not respect
JAVA_OPTS
env. I had to override entrypoint to this:java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 $JAVA_OPTS -jar /app.jar /config.yml com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication
The result is the same, import slows down to low values, then OOMs.@aadant
This project has all the tooling to do it
Yeah, I saw mentions of
Clickhouse Loader
, but the documentation link on the main page leads to 404 (https://github.com/Altinity/clickhouse-sink-connector/blob/develop/python/README.md). Could you please lead me to the tool?In order to initialize large databases
The logical dump I am trying to create snapshot from weights 29GB uncompressed. The largest table is 14GB, but the snapshot process stalls after processing ~5GB (two tables 1.7GB and 3GB respectively). Is this really the limit?
Hi @AdamKatzDev , this is the updated path - https://github.com/Altinity/clickhouse-sink-connector/tree/develop/sink-connector/python
I was able to load the dump, had to update the script a bit though. Still, I wish there was a way to create snapshots. As was mentioned before, that could be a good automatic recovery method. And in cases when downtime is not acceptable I have no idea how to sync the Clickhouse instance afterwards.
Also I was trying to use Airbyte and it also stopped working after about 5GB. This might be a coincidence of course.
@AdamKatzDev see the discussion here https://github.com/Altinity/clickhouse-sink-connector/discussions/180 @subkanthi this should be documented because it is one of the strength of this project !
You can take a consistent snapshot using mysqlshell that dumps the data as compressed TSV files then load it using the python scripts that calls clickhouse-client
. Let us know if it works for you.
@aadant thanks for providing the info, I was able to transfer the dump and turn on the connector. I haven't tested much except updates but it looks promising.
@subkanthi I believe that the issue should be kept open.
Thanks for the help, folks. In case I have any questions I'll move them somewhere else, i.e. to a discussion.
@subkanthi
I've made a heap dump using jmap
from the application running with -xmx5G
limit at the moment it started stalling. Eclipse MAT shows there are ~1.4M ClickHouseStruct
objects stored. My guess the only difference between -xmx5G
and -xmx10G
will be the amount of those objects before the snapshot process stops.
Looks like queryToRecordsMap
in ClickHouseBatchRunnable::processRecordsByTopic
gets filled with data and there are no means to limit the amount of records somehow. Is it possible to rewrite this code to collect data in batches? My main language is not Java so I might not be experienced enough to tackle this.
I've also tried to spin the docker image. Looks like it does not respect JAVA_OPTS env. I had to override entrypoint to this: java -agentlib:jdwp=transport=dt_socket,server=y,suspend=n,address=*:5005 $JAVA_OPTS -jar /app.jar /config.yml com.altinity.clickhouse.debezium.embedded.ClickHouseDebeziumEmbeddedApplication The result is the same, import slows down to low values, then OOMs.
see https://github.com/Altinity/clickhouse-sink-connector/issues/325
@AdamKatzDev maybe it will work better with more memory ? You can now set the memory (see the fix)
@aadant I've figured out how to run the image with increased memory constraints and it acts similar to the JAR file, i.e. stalls but a bit later. If you are interested I can analyze dump for a -xmx10G
run to prove the point.
@AdamKatzDev, I have faced with this issue too. The reason is that method put() in ClickHouseSinkTask receives records faster than ClickhouseBatchRunnable processes them. I have solved that issue by adding a code to control a buffer filling. If limit of records in a buffer (buffer is the "records" field in the ClickHouseSinkTask class) is reached, method put() continuously polls its condition until it's free for a new batch.
@Override
public void put(Collection<SinkRecord> records) {
long taskId = this.config.getLong(ClickHouseSinkConnectorConfigVariables.TASK_ID.toString());
long recievedRecordsCount = records.size();
totalRecords += recievedRecordsCount;
log.debug("******** CLICKHOUSE received records **** " + totalRecords + " Task Id: " + taskId);
while (this.getBufferCount() + recievedRecordsCount > bufferMaxRecords) {
try {
Thread.sleep(100);
}
catch(InterruptedException e) {
log.error("******** SLEEP ERROR {} ****", e.getMessage());
}
}
public long getBufferCount() {
long recordsInBuffer = 0;
for (Map.Entry<String, ConcurrentLinkedQueue<ClickHouseStruct>> entry : this.records.entrySet()) {
recordsInBuffer += entry.getValue().size();
}
return recordsInBuffer;
}
@IlyaTsoi interesting, worth taking a look
@aadant I've analyzed dump for a -xmx10G
run:
The relative picture haven't changed much, there is queryToRecordsMap
field with 68% of memory consumed out of 10GB. There is also java.util.concurrent.ConcurrentHashMap$Node
object taking 20% of memory, which is likely the record
field (or part of it) that @IlyaTsoi has mentioned.
Hi @IlyaTsoi , thanks for the analysis, will take a look at your solution.
@AdamKatzDev just a heads up, the put()
function is only called on the kafka version, in the embedded version its written
from here
private void processEveryChangeRecord(Properties props, ChangeEvent<SourceRecord, SourceRecord> record,
DebeziumRecordParserService debeziumRecordParserService,
ClickHouseSinkConnectorConfig config) {
try {
SourceRecord sr = record.value();
Struct struct = (Struct) sr.value();
if (struct == null) {
log.error("STRUCT EMPTY");
//return;
}
if (struct.schema() == null) {
log.error("SCHEMA EMPTY");
}
List<Field> schemaFields = struct.schema().fields();
if (schemaFields == null) {
return;
}
Field matchingDDLField = schemaFields.stream()
.filter(f -> "DDL".equalsIgnoreCase(f.name()))
.findAny()
.orElse(null);
if (matchingDDLField != null) {
String DDL = (String) struct.get("ddl");
log.debug("Source DB DDL: " + DDL);
if (DDL != null && DDL.isEmpty() == false)
//&& ((DDL.toUpperCase().contains("ALTER TABLE") || DDL.toUpperCase().contains("RENAME TABLE"))))
{
log.info("***** DDL received, Flush all existing records");
this.executor.shutdown();
this.executor.awaitTermination(60, TimeUnit.SECONDS);
performDDLOperation(DDL, props, sr, config);
this.executor = new ClickHouseBatchExecutor(config.getInt(ClickHouseSinkConnectorConfigVariables.THREAD_POOL_SIZE.toString()));
this.executor.scheduleAtFixedRate(this.runnable, 0, config.getLong(ClickHouseSinkConnectorConfigVariables.BUFFER_FLUSH_TIME.toString()), TimeUnit.MILLISECONDS);
}
} else {
ClickHouseStruct chStruct = debeziumRecordParserService.parse(sr);
try {
this.replicationLag = chStruct.getReplicationLag();
this.binLogFile = chStruct.getFile();
this.binLogPosition = String.valueOf(chStruct.getPos());
this.gtid = String.valueOf(chStruct.getGtid());
} catch(Exception e) {
log.error("Error retrieving status metrics");
}
ConcurrentLinkedQueue<ClickHouseStruct> queue = new ConcurrentLinkedQueue<ClickHouseStruct>();
if (chStruct != null) {
queue.add(chStruct);
}
synchronized (this.records) {
if (chStruct != null) {
addRecordsToSharedBuffer(chStruct.getTopic(), chStruct);
}
}
}
I am also experiencing this problem.
Trying to do an initial sync of a 200gb table. With 64GB of RAM it runs fine but eventually uses all of its RAM slows to a crawl and then gets killed.
@jgould22 is it a MySQL setup ? for relatively large tables, it is recommended to take the initial dump (we have the tooling in this project) then start replication from the consistent dump position. It takes less than 1h to dump one TB of data and restore it to CH.
It is (mysql 5.7.44 to be specific)
I have not yet tried that, currently just experimenting with some of the options but I will explore the initial dump path.
@jgould22 : you would need to dump the MySQL data using https://github.com/Altinity/clickhouse-sink-connector/blob/develop/sink-connector/python/db_dump/mysql_dumper.py
to load the consistent dump, you can use this https://github.com/Altinity/clickhouse-sink-connector/blob/develop/sink-connector/python/db_load/clickhouse_loader.py
The sink connector lets you start at the dump position similar to MySQL.
for the config, you can use this when using an initial dump.
https://github.com/Altinity/clickhouse-sink-connector/blob/f1eb58a762dec531a172ffa0f94e47dd4a9ac722/doc/configuration.md?plain=1#L14
binary.handling.mode: "base64"
snapshot.mode: "schema_only"
enable.snapshot.ddl: "false"
auto.create.tables: "false"
Please check the datetime timezones (it should work if the 2 servers have the same TZ but I would double check)
I was facing the same issue and tuning using snapshot.fetch.size
in config.yml
allowed me to throttle the snapshot creation which solved my issue.