flink-cdc icon indicating copy to clipboard operation
flink-cdc copied to clipboard

[tidb-cdc]KeyRange in TableKeyRangeUtils distribution is unreasonable.Scan tidb table is not divided between different subtasks

Open xieyi888 opened this issue 3 years ago • 4 comments

Describe the bug(Please use English)

KeyRange in TableKeyRangeUtils distribution is unreasonable. Scan tidb table is not divided between different subtasks

Environment :

  • Flink version : flink 1.14
  • Flink CDC version: 2.2.0
  • Database and version: TIDB 5.1

To Reproduce Steps to reproduce the behavior:

1. prepare TiDB table

total count is 744750

mysql> select count(1) from test_tab;e;
+----------+
| count(1) |
+----------+
|   744750 |
+----------+
1 row in set (0.12 sec)

2. flink sql code

setParallelism: 3

CREATE TABLE source_table (
    id INT,
    createTime timestamp,
    updateTime timestamp,
  PRIMARY KEY(id) NOT ENFORCED
)WITH (
    'connector' = 'tidb-cdc',
    'pd-addresses' = '****',
    'database-name' = '****',
    'table-name' = '****',
    'scan.startup.mode' = 'initial'
    );

CREATE TABLE sink_table (
    id INT,
    createTime timestamp,
    updateTime timestamp,
  PRIMARY KEY(id) NOT ENFORCED
    ) WITH (
    'connector' = 'print');

insert into sink_table
select * from source_table;

3. observe webui of this job

here are three source subtasks. however,only one subtask Process data image

Additional Description

1. with debug , I find the code

com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils

when Parallelism>1 : keyRange is divided by (Long.MAX_VALUE-Long.MIN_VALUE)/Parallelism. however, for most TIDB tables, Key is far belower than Long.MAX_VALUE,or far larger than Long.MIN_VALUE. It leads to only one subtask's keyRange contains valid KeyRange of TiDB table.

    public static List<KeyRange> getTableKeyRanges(final long tableId, final int num) {
        Preconditions.checkArgument(num > 0, "Illegal value of num");

        if (num == 1) {
            return ImmutableList.of(getTableKeyRange(tableId));
        }

        final long delta =
                BigInteger.valueOf(Long.MAX_VALUE)
                        .subtract(BigInteger.valueOf(Long.MIN_VALUE + 1))
                        .divide(BigInteger.valueOf(num))
                        .longValueExact();
        final ImmutableList.Builder<KeyRange> builder = ImmutableList.builder();
        for (int i = 0; i < num; i++) {
            final RowKey startKey =
                    (i == 0)
                            ? RowKey.createMin(tableId)
                            : RowKey.toRowKey(tableId, Long.MIN_VALUE + delta * i);
            final RowKey endKey =
                    (i == num - 1)
                            ? RowKey.createBeyondMax(tableId)
                            : RowKey.toRowKey(tableId, Long.MIN_VALUE + delta * (i + 1));
            builder.add(
                    KeyRangeUtils.makeCoprocRange(startKey.toByteString(), endKey.toByteString()));
        }
        return builder.build();
    }

2. how to fix it

com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils Maybe we can: (1)get the minKey、MaxKey of TIDB table.(via Tikv client or jdbc) (2)set delta = (maxKey-minKey)/Parallelism (3)set startKey、endKey in loop

 for (int i = 0; i < num; i++) {
            final RowKey startKey =
                    (i == 0)
                            ? RowKey.createMin(tableId)
                            : RowKey.toRowKey(tableId, minKey + delta * i);
            final RowKey endKey =
                    (i == num - 1)
                            ? RowKey.createBeyondMax(tableId)
                            : RowKey.toRowKey(tableId, maxKey + delta * (i + 1));
        }

3. Other phenomena which also indicates the keyRegion is devided unreasonable between different subtasks

com.ververica.cdc.connectors.tidb.TiKVRichParallelSourceFunction#readSnapshotEvents to log the keyReange of each subtask, and the the raws it process.

    protected void readSnapshotEvents() throws Exception {
        LOG.info("read snapshot events");
......
//to log the keyReange of each subtask
        RowKey startKeyDecode = RowKey.decode(keyRange.getStart().toByteArray());
        RowKey endKeyDecode = RowKey.decode(keyRange.getEnd().toByteArray());
        LOG.info(
                "will scan key from {} to {}.", startKeyDecode.toString(), endKeyDecode.toString());
        try {
            ByteString start = keyRange.getStart();
            while (true) {
                final List<Kvrpcpb.KvPair> segment =
                        scanClient.scan(start, keyRange.getEnd(), startTs);
//to log  the raws it process
                LOG.info("will scan {} rows.", segment.size());
             ......

    }

log of each subtask: It also indicates the keyRegion is devided unreasonable between different subtasks subtask 0

will scan key from -9223372036854775808 to -3074457345618258604.
will scan 0 rows.

subtask 1

will scan key from -3074457345618258604 to 3074457345618258600.
 will scan 744750 rows.

subtask 2

 will scan key from 3074457345618258600 to -9223372036854775808.
 will scan 0 rows.

however, TIDB primary key region approximately between 2~893067

mysql> select min(id) from test_tab;
+---------+
| min(id) |
+---------+
|       2 |
+---------+
mysql> select max(id) from test_tab;
+---------+
| max(id) |
+---------+
|  893067 |
+---------+

xieyi888 avatar May 17 '22 07:05 xieyi888

你好,请问你最后是用什么办法来解决的呢? 我目前也遇到这个问题,开启全量同步数据时,获取的数据全部在单任务中,导致内存占用很大,请不吝赐教,谢谢

juecong avatar Sep 13 '22 09:09 juecong

I had not fix it.

Here is the ideas, maybe you can refer to it

com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils

(1)get the minKey、MaxKey of TIDB table.(via Tikv client or jdbc) (2)set delta = (maxKey-minKey)/Parallelism (3)set startKey、endKey in loop

 for (int i = 0; i < num; i++) {
            final RowKey startKey =
                    (i == 0)
                            ? RowKey.createMin(tableId)
                            : RowKey.toRowKey(tableId, minKey + delta * i);
            final RowKey endKey =
                    (i == num - 1)
                            ? RowKey.createBeyondMax(tableId)
                            : RowKey.toRowKey(tableId, maxKey + delta * (i + 1));
        }

xieyi888 avatar Sep 14 '22 12:09 xieyi888

I had not fix it.

Here is the ideas, maybe you can refer to it

com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils

(1)get the minKey、MaxKey of TIDB table.(via Tikv client or jdbc) (2)set delta = (maxKey-minKey)/Parallelism (3)set startKey、endKey in loop

 for (int i = 0; i < num; i++) {
            final RowKey startKey =
                    (i == 0)
                            ? RowKey.createMin(tableId)
                            : RowKey.toRowKey(tableId, minKey + delta * i);
            final RowKey endKey =
                    (i == num - 1)
                            ? RowKey.createBeyondMax(tableId)
                            : RowKey.toRowKey(tableId, maxKey + delta * (i + 1));
        }

好的。非常感谢

juecong avatar Sep 14 '22 13:09 juecong

I had not fix it.

Here is the ideas, maybe you can refer to it

com.ververica.cdc.connectors.tidb.table.utils.TableKeyRangeUtils

(1)get the minKey、MaxKey of TIDB table.(via Tikv client or jdbc) (2)set delta = (maxKey-minKey)/Parallelism (3)set startKey、endKey in loop

 for (int i = 0; i < num; i++) {
            final RowKey startKey =
                    (i == 0)
                            ? RowKey.createMin(tableId)
                            : RowKey.toRowKey(tableId, minKey + delta * i);
            final RowKey endKey =
                    (i == num - 1)
                            ? RowKey.createBeyondMax(tableId)
                            : RowKey.toRowKey(tableId, maxKey + delta * (i + 1));
        }

请问我该怎样获取long类型的minKey和maxKey,如使用tikv client 。 另外ByteString.copyFromUtf8("key"), 这个方法中的Key指的是什么字符呢,如表中有id,name列, 表中对应的数据为 1, 名称

juecong avatar Sep 15 '22 11:09 juecong

Closing this issue because it was created before version 2.3.0 (2022-11-10). Please try the latest version of Flink CDC to see if the issue has been resolved. If the issue is still valid, kindly report it on Apache Jira under project Flink with component tag Flink CDC. Thank you!

PatrickRen avatar Feb 28 '24 15:02 PatrickRen