flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

flink-connector-mysql-cdc-2.4.2 when a multi-column primary key index starts with a varchar column, a snapshot chunk can potentially return a large number of rows and cause jvm to OOM.

Open trikker opened this issue 1 year ago • 8 comments

Search before asking

  • [X] I searched in the issues and found nothing similar.

Flink version

1.17.1

Flink CDC version

flink-connector-mysql-cdc-2.4.2

Database and its version

source: MySQL 8.0.19 destination: Doris 2.0.2

Minimal reproduce step

  1. create a MySQL table with multi-column primary key and its first column is varchar
CREATE DATABASE `mydb`;
CREATE TABLE `t` (
  `col1` varchar(64) NOT NULL,
  `col2` varchar(64) NOT NULL,
  `col3` datetime(6) NOT NULL,
  `col4` varchar(200) DEFAULT NULL,
  `col5` varchar(500) DEFAULT NULL,
  PRIMARY KEY (`col1`,`col2`,`col3`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4;
  1. insert a lot of rows into the table
use mydb;
# execute the below SQL for 5000000 times
insert into t values('aaa'), repeat('a', 64), now(6), repeat('a', 200), repeat('a', 500));
insert into t values('aaa'), repeat('a', 64), now(6), repeat('a', 200), repeat('a', 500));
insert into t values('aaa'), repeat('a', 64), now(6), repeat('a', 200), repeat('a', 500));
...

# execute the below SQL for 5000000 times
insert into t values('bbb'), repeat('a', 64), now(6), repeat('a', 200), repeat('a', 500));
insert into t values('bbb'), repeat('a', 64), now(6), repeat('a', 200), repeat('a', 500));
insert into t values('bbb'), repeat('a', 64), now(6), repeat('a', 200), repeat('a', 500));
...

# execute the below SQL for 5000000 times
insert into t values('ccc'), repeat('a', 64), now(6), repeat('a', 200), repeat('a', 500));
insert into t values('ccc'), repeat('a', 64), now(6), repeat('a', 200), repeat('a', 500));
insert into t values('ccc'), repeat('a', 64), now(6), repeat('a', 200), repeat('a', 500));
...
  1. create database in doris
CREATE DATABASE `mydb`;
  1. create user in MySQL and doris

In MySQL:

create user flink identified by 'pass';
grant all on *.* to flink;

In Doris:

create user flink identified by 'pass';
grant all on *.* to flink;
  1. download flink-doris and flink-cdc jar and put them into the lib directory of flink-1.17.1
flink-doris-connector-1.17-1.4.0.jar
flink-sql-connector-mysql-cdc-2.4.2.jar
  1. use flink to sync the data from MySQL to doris
bin/flink run -d \
    -Dexecution.checkpointing.interval=10s \
    -Dparallelism.default=1 \
    -c org.apache.doris.flink.tools.cdc.CdcTools \
    lib/flink-doris-connector-1.17-1.4.0.jar \
    mysql-sync-database \
    --database idc_manager \
    --job-name flink_sync_mysql_to_doris \
    --mysql-conf hostname=<MYSQL_IP> \
    --mysql-conf port=3306 \
    --mysql-conf username=flink \
    --mysql-conf password=pass \
    --mysql-conf database-name=mydb\
    --including-tables "t" \
    --sink-conf fenodes=<DORIS_FE_IP>:8030 \
    --sink-conf username=flink \
    --sink-conf password=pass \
    --sink-conf jdbc-url=jdbc:mysql://<DORIS_FE_IP>:9030 \
    --sink-conf sink.label-prefix=label1 \
    --table-conf replication_num=3

What did you expect to see?

The table should be split into 8096 rows per chunk.

What did you see instead?

The table was split into 500M rows per chunk and finally flink job OOM.

The split chunk SQL in MySQL will return 500M rows.

SELECT * FROM `mydb`.`t` WHERE `col1` <= 'bbb' AND NOT (`col1` = 'bbb');

flink job OOM:

Caused by: java.lang.OutOfMemoryError: Java heap space

Anything else?

When the below conditions are met, the above bad case will occur:

  1. table with multi-column primary key;
  2. the first column of the primary key is varchar;
  3. there are a lot of rows with the same value of the first column;

Root cause:

The flink-mysql-cdc will use the first column to split the table into chunks and a chunk should contain 8096 rows approximately. But this is a ideal case. In a table with multi-column primary key, multiple rows can have the same first column value and it will make flink-cdc-mysql to read a lot of rows as a chunk because the begin and end of the chunk is only marked with the first column so it cannot just read 8096 rows. Instead it will read the next different value of the first column and this may result a lot of rows in a chunk.

Suggest fix:

Use the full column names of the primary key to split the table into chunks.

I am willing to submit a PR but I am new to Java and I don't know how much time it will take.

Are you willing to submit a PR?

  • [X] I'm willing to submit a PR!

trikker avatar Nov 09 '23 13:11 trikker

会走非公平切分那一块逻辑,每个chunk size的大小应该不会特别夸张吧

WanYukun avatar Nov 24 '23 05:11 WanYukun

The same issue here. @trikker Do you have some workaround that could share? Thanks!

link3280 avatar Dec 28 '23 03:12 link3280

这是来自QQ邮箱的自动回复邮件。你好,邮件正常收到,谢谢!                          -----万玉坤

WanYukun avatar Dec 28 '23 04:12 WanYukun

This bug is caused by different charsets and collation rules in the MySQL database. Because we use JAVA to compare the max value of table and the Chunk-End value to check the End-Bound, but the max value comes from the SELECT MAX(columnName) FROM TABLEANAME, this result is affected by the database's character set and collation rules. I have a pr to fixed it, but I'm not sure if it is appropriate or not; https://github.com/ververica/flink-cdc-connectors/pull/2968

AidenPerce avatar Jan 09 '24 02:01 AidenPerce

When use a primaryKey which is varchar, we can reproduce this problem; Example:

  • Create a table with a varchar-primaryKey, charset is "utf8mb4", collation is "utf8mb4_general_ci";
  • The values of primaryKey like ['0000','1111','2222','aaaa','bbbb','cccc','ZZZZ'],which includes uppercase letters.
  • Create a cdc-source with 'scan.incremental.snapshot.chunk.size'='2' The expect chunks is : ['0000]['1111']['2222']['aaaa']['bbbb']['cccc','ZZZZ'] The actual chunks is : ['0000]['1111']['2222']['aaaa','bbbb','cccc','ZZZZ']

AidenPerce avatar Jan 09 '24 02:01 AidenPerce

Hi, @trikker . Mysql CDC supports to set scan.incremental.snapshot.chunk.key-column to select a column in the primary key to split chunks.

ruanhang1993 avatar Feb 04 '24 07:02 ruanhang1993

这是来自QQ邮箱的自动回复邮件。你好,邮件正常收到,谢谢!                          -----万玉坤

WanYukun avatar Feb 04 '24 07:02 WanYukun

Hi, @trikker . Mysql CDC supports to set scan.incremental.snapshot.chunk.key-column to select a column in the primary key to split chunks. When the type of the partition key is set to varchar, the original logic of unevenly-chunked data can be affected by the database's character-set/sorting-rules, leading to the creation of very large chunks and causing OOM errors. This issue is unrelated to whether there are multiple primary keys.

AidenPerce avatar Feb 05 '24 06:02 AidenPerce

Closing this issue as it has been migrated to Apache Jira.

PatrickRen avatar Apr 09 '24 05:04 PatrickRen

这是来自QQ邮箱的自动回复邮件。你好,邮件正常收到,谢谢!                          -----万玉坤

WanYukun avatar Apr 09 '24 06:04 WanYukun