[tidb-cdc]KeyRange in TableKeyRangeUtils distribution is unreasonable.Scan tidb table is not divided between different subtasks
Describe the bug(Please use English)
KeyRange in TableKeyRangeUtils distribution is unreasonable. Scan tidb table is not divided between different subtasks
Environment :
- Flink version : flink 1.14
- Flink CDC version: 2.2.0
- Database and version: TIDB 5.1
To Reproduce Steps to reproduce the behavior:
1. prepare TiDB table
total count is 744750
mysql> select count(1) from test_tab;e;
+----------+
| count(1) |
+----------+
| 744750 |
+----------+
1 row in set (0.12 sec)
2. flink sql code
setParallelism: 3
CREATE TABLE source_table (
id INT,
createTime timestamp,
updateTime timestamp,
PRIMARY KEY(id) NOT ENFORCED
)WITH (
'connector' = 'tidb-cdc',
'pd-addresses' = '****',
'database-name' = '****',
'table-name' = '****',
'scan.startup.mode' = 'initial'
);
CREATE TABLE sink_table (
id INT,
createTime timestamp,
updateTime timestamp,
PRIMARY KEY(id) NOT ENFORCED
) WITH (
'connector' = 'print');
insert into sink_table
select * from source_table;
3. observe webui of this job
here are three source subtasks. however,only one subtask Process data

Additional Description
1. with debug , I find the code
com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils
when Parallelism>1 : keyRange is divided by (Long.MAX_VALUE-Long.MIN_VALUE)/Parallelism. however, for most TIDB tables, Key is far belower than Long.MAX_VALUE,or far larger than Long.MIN_VALUE. It leads to only one subtask's keyRange contains valid KeyRange of TiDB table.
public static List<KeyRange> getTableKeyRanges(final long tableId, final int num) {
Preconditions.checkArgument(num > 0, "Illegal value of num");
if (num == 1) {
return ImmutableList.of(getTableKeyRange(tableId));
}
final long delta =
BigInteger.valueOf(Long.MAX_VALUE)
.subtract(BigInteger.valueOf(Long.MIN_VALUE + 1))
.divide(BigInteger.valueOf(num))
.longValueExact();
final ImmutableList.Builder<KeyRange> builder = ImmutableList.builder();
for (int i = 0; i < num; i++) {
final RowKey startKey =
(i == 0)
? RowKey.createMin(tableId)
: RowKey.toRowKey(tableId, Long.MIN_VALUE + delta * i);
final RowKey endKey =
(i == num - 1)
? RowKey.createBeyondMax(tableId)
: RowKey.toRowKey(tableId, Long.MIN_VALUE + delta * (i + 1));
builder.add(
KeyRangeUtils.makeCoprocRange(startKey.toByteString(), endKey.toByteString()));
}
return builder.build();
}
2. how to fix it
com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils Maybe we can: (1)get the minKey、MaxKey of TIDB table.(via Tikv client or jdbc) (2)set delta = (maxKey-minKey)/Parallelism (3)set startKey、endKey in loop
for (int i = 0; i < num; i++) {
final RowKey startKey =
(i == 0)
? RowKey.createMin(tableId)
: RowKey.toRowKey(tableId, minKey + delta * i);
final RowKey endKey =
(i == num - 1)
? RowKey.createBeyondMax(tableId)
: RowKey.toRowKey(tableId, maxKey + delta * (i + 1));
}
3. Other phenomena which also indicates the keyRegion is devided unreasonable between different subtasks
com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction#readSnapshotEvents to log the keyReange of each subtask, and the the raws it process.
protected void readSnapshotEvents() throws Exception {
LOG.info("read snapshot events");
......
//to log the keyReange of each subtask
RowKey startKeyDecode = RowKey.decode(keyRange.getStart().toByteArray());
RowKey endKeyDecode = RowKey.decode(keyRange.getEnd().toByteArray());
LOG.info(
"will scan key from {} to {}.", startKeyDecode.toString(), endKeyDecode.toString());
try {
ByteString start = keyRange.getStart();
while (true) {
final List<Kvrpcpb.KvPair> segment =
scanClient.scan(start, keyRange.getEnd(), startTs);
//to log the raws it process
LOG.info("will scan {} rows.", segment.size());
......
}
log of each subtask: It also indicates the keyRegion is devided unreasonable between different subtasks subtask 0
will scan key from -9223372036854775808 to -3074457345618258604.
will scan 0 rows.
subtask 1
will scan key from -3074457345618258604 to 3074457345618258600.
will scan 744750 rows.
subtask 2
will scan key from 3074457345618258600 to -9223372036854775808.
will scan 0 rows.
however, TIDB primary key region approximately between 2~893067
mysql> select min(id) from test_tab;
+---------+
| min(id) |
+---------+
| 2 |
+---------+
mysql> select max(id) from test_tab;
+---------+
| max(id) |
+---------+
| 893067 |
+---------+
你好,请问你最后是用什么办法来解决的呢? 我目前也遇到这个问题,开启全量同步数据时,获取的数据全部在单任务中,导致内存占用很大,请不吝赐教,谢谢
I had not fix it.
Here is the ideas, maybe you can refer to it
com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils
(1)get the minKey、MaxKey of TIDB table.(via Tikv client or jdbc) (2)set delta = (maxKey-minKey)/Parallelism (3)set startKey、endKey in loop
for (int i = 0; i < num; i++) {
final RowKey startKey =
(i == 0)
? RowKey.createMin(tableId)
: RowKey.toRowKey(tableId, minKey + delta * i);
final RowKey endKey =
(i == num - 1)
? RowKey.createBeyondMax(tableId)
: RowKey.toRowKey(tableId, maxKey + delta * (i + 1));
}
I had not fix it.
Here is the ideas, maybe you can refer to it
com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils
(1)get the minKey、MaxKey of TIDB table.(via Tikv client or jdbc) (2)set delta = (maxKey-minKey)/Parallelism (3)set startKey、endKey in loop
for (int i = 0; i < num; i++) { final RowKey startKey = (i == 0) ? RowKey.createMin(tableId) : RowKey.toRowKey(tableId, minKey + delta * i); final RowKey endKey = (i == num - 1) ? RowKey.createBeyondMax(tableId) : RowKey.toRowKey(tableId, maxKey + delta * (i + 1)); }
好的。非常感谢
I had not fix it.
Here is the ideas, maybe you can refer to it
com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils
(1)get the minKey、MaxKey of TIDB table.(via Tikv client or jdbc) (2)set delta = (maxKey-minKey)/Parallelism (3)set startKey、endKey in loop
for (int i = 0; i < num; i++) { final RowKey startKey = (i == 0) ? RowKey.createMin(tableId) : RowKey.toRowKey(tableId, minKey + delta * i); final RowKey endKey = (i == num - 1) ? RowKey.createBeyondMax(tableId) : RowKey.toRowKey(tableId, maxKey + delta * (i + 1)); }
请问我该怎样获取long类型的minKey和maxKey,如使用tikv client 。 另外ByteString.copyFromUtf8("key"), 这个方法中的Key指的是什么字符呢,如表中有id,name列, 表中对应的数据为 1, 名称
Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!