milvus-cdc icon indicating copy to clipboard operation
milvus-cdc copied to clipboard

[Bug]: Milvus-CDC only synchronizes the "default" database, and newly created databases will not be synchronized.

Open huangping11 opened this issue 9 months ago • 4 comments

Current Behavior

The command to create a CDC task is as follows. curl -X POST http://localhost:8444/cdc
-H "Content-Type: application/json"
-d '{ "request_type": "create", "request_data": { "milvus_connect_param": { "uri": "http://192.168.3.242:19530", "token":"root:Milvus", "connect_timeout": 10 }, "collection_infos": [ { "name": "*" } ] } }'

The task was created successfully. The newly created collections in the DEFAULT database will be synchronized to the target environment. However, there is no log output for the newly created database, and it will not be synchronized to the target environment. The log output is as follows:

[2025/03/06 10:03:02.385 +08:00] [INFO] [reader/etcd_op.go:251] ["the collection state is not created"] [key=by-dev/meta/root-coord/database/collection-info/1/456389384978334720] [collection_name=t2] [state=CollectionCreating] [2025/03/06 10:03:02.393 +08:00] [INFO] [reader/etcd_op.go:389] ["partition state is not created or partition name is default"] [collection_id=456389384978334720] ["partition name"=_default] [state=PartitionCreated] [2025/03/06 10:03:02.406 +08:00] [INFO] [reader/collection_reader.go:111] ["has watched to read collection"] [task_id=4e4e6a32b40f493d856d3dbe1504a453] [collection_name=t2] [collection_id=456389384978334720] [2025/03/06 10:03:02.413 +08:00] [WARN] [reader/target_client.go:99] ["fail to get collection info"] [error="can't find collection[database=default][collection=t2]"] [errorVerbose="can't find collection[database=default][collection=t2]\n(1) attached stack trace\n -- stack trace:\n | github.com/milvus-io/milvus-sdk-go/v2/client.handleRespStatus\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus-sdk-go/[email protected]/client/collection.go:39\n | github.com/milvus-io/milvus-sdk-go/v2/client.(*GrpcClient).DescribeCollection\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus-sdk-go/[email protected]/client/collection.go:274\n | github.com/zilliztech/milvus-cdc/core/reader.(*TargetClient).GetCollectionInfo.func1\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/target_client.go:87\n | github.com/zilliztech/milvus-cdc/core/reader.(*TargetClient).milvusOp\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/target_client.go:68\n | github.com/zilliztech/milvus-cdc/core/reader.(*TargetClient).GetCollectionInfo\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/target_client.go:86\n | github.com/zilliztech/milvus-cdc/core/reader.(*replicateChannelManager).startReadCollectionForMilvus.func1\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/replicate_channel_manager.go:223\n | github.com/milvus-io/milvus/pkg/util/retry.Do\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus/[email protected]/util/retry/retry.go:44\n | github.com/zilliztech/milvus-cdc/core/reader.(*replicateChannelManager).startReadCollectionForMilvus\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/replicate_channel_manager.go:222\n | github.com/zilliztech/milvus-cdc/core/reader.(*replicateChannelManager).StartReadCollection\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/replicate_channel_manager.go:318\n | github.com/zilliztech/milvus-cdc/core/reader.(*CollectionReader).StartRead.func1.1\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/collection_reader.go:126\n | github.com/zilliztech/milvus-cdc/core/reader.(*EtcdOp).WatchCollection.func1.1.2.2\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/etcd_op.go:277\n | github.com/zilliztech/milvus-cdc/core/util.(*Map[...]).Range.func1\n | \t/data/tmp/dba_tools/milvus-cdc/core/util/atomic.go:102\n | sync.(*Map).Range\n | \t/usr/local/go/src/sync/map.go:501\n | github.com/zilliztech/milvus-cdc/core/util.(*Map[...]).Range\n | \t/data/tmp/dba_tools/milvus-cdc/core/util/atomic.go:101\n | github.com/zilliztech/milvus-cdc/core/reader.(*EtcdOp).WatchCollection.func1.1.2\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/etcd_op.go:276\n | github.com/milvus-io/milvus/pkg/util/conc.(*Pool[...]).Submit.func1\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus/[email protected]/util/conc/pool.go:82\n | github.com/panjf2000/ants/v2.(*goWorker).run.func1\n | \t/root/go/pkg/mod/github.com/panjf2000/ants/[email protected]/worker.go:67\n | runtime.goexit\n | \t/usr/local/go/src/runtime/asm_amd64.s:1700\nWraps: (2) can't find collection[database=default][collection=t2]\nError types: (1) *withstack.withStack (2) *errutil.leafError"] [2025/03/06 10:03:02.418 +08:00] [WARN] [reader/target_client.go:99] ["fail to get collection info"] [error="can't find collection[database=default][collection=t2]"] [errorVerbose="can't find collection[database=default][collection=t2]\n(1) attached stack trace\n -- stack trace:\n | github.com/milvus-io/milvus-sdk-go/v2/client.handleRespStatus\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus-sdk-go/[email protected]/client/collection.go:39\n | github.com/milvus-io/milvus-sdk-go/v2/client.(*GrpcClient).DescribeCollection\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus-sdk-go/[email protected]/client/collection.go:274\n | github.com/zilliztech/milvus-cdc/core/reader.(*TargetClient).GetCollectionInfo.func1\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/target_client.go:87\n | github.com/zilliztech/milvus-cdc/core/reader.(*TargetClient).milvusOp\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/target_client.go:68\n | github.com/zilliztech/milvus-cdc/core/reader.(*TargetClient).GetCollectionInfo\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/target_client.go:86\n | github.com/zilliztech/milvus-cdc/core/reader.(*replicateChannelManager).startReadCollectionForMilvus.func2\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/replicate_channel_manager.go:262\n | github.com/milvus-io/milvus/pkg/util/retry.Do\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus/[email protected]/util/retry/retry.go:44\n | github.com/zilliztech/milvus-cdc/core/reader.(*replicateChannelManager).startReadCollectionForMilvus\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/replicate_channel_manager.go:261\n | github.com/zilliztech/milvus-cdc/core/reader.(*replicateChannelManager).StartReadCollection\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/replicate_channel_manager.go:318\n | github.com/zilliztech/milvus-cdc/core/reader.(*CollectionReader).StartRead.func1.1\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/collection_reader.go:126\n | github.com/zilliztech/milvus-cdc/core/reader.(*EtcdOp).WatchCollection.func1.1.2.2\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/etcd_op.go:277\n | github.com/zilliztech/milvus-cdc/core/util.(*Map[...]).Range.func1\n | \t/data/tmp/dba_tools/milvus-cdc/core/util/atomic.go:102\n | sync.(*Map).Range\n | \t/usr/local/go/src/sync/map.go:501\n | github.com/zilliztech/milvus-cdc/core/util.(*Map[...]).Range\n | \t/data/tmp/dba_tools/milvus-cdc/core/util/atomic.go:101\n | github.com/zilliztech/milvus-cdc/core/reader.(*EtcdOp).WatchCollection.func1.1.2\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/etcd_op.go:276\n | github.com/milvus-io/milvus/pkg/util/conc.(*Pool[...]).Submit.func1\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus/[email protected]/util/conc/pool.go:82\n | github.com/panjf2000/ants/v2.(*goWorker).run.func1\n | \t/root/go/pkg/mod/github.com/panjf2000/ants/[email protected]/worker.go:67\n | runtime.goexit\n | \t/usr/local/go/src/runtime/asm_amd64.s:1700\nWraps: (2) can't find collection[database=default][collection=t2]\nError types: (1) *withstack.withStack (2) *errutil.leafError"] [2025/03/06 10:03:02.418 +08:00] [WARN] [retry/retry.go:46] ["retry func failed"] [retried=0] [error="can't find collection[database=default][collection=t2]"] [errorVerbose="can't find collection[database=default][collection=t2]\n(1) attached stack trace\n -- stack trace:\n | github.com/milvus-io/milvus-sdk-go/v2/client.handleRespStatus\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus-sdk-go/[email protected]/client/collection.go:39\n | github.com/milvus-io/milvus-sdk-go/v2/client.(*GrpcClient).DescribeCollection\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus-sdk-go/[email protected]/client/collection.go:274\n | github.com/zilliztech/milvus-cdc/core/reader.(*TargetClient).GetCollectionInfo.func1\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/target_client.go:87\n | github.com/zilliztech/milvus-cdc/core/reader.(*TargetClient).milvusOp\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/target_client.go:68\n | github.com/zilliztech/milvus-cdc/core/reader.(*TargetClient).GetCollectionInfo\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/target_client.go:86\n | github.com/zilliztech/milvus-cdc/core/reader.(*replicateChannelManager).startReadCollectionForMilvus.func2\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/replicate_channel_manager.go:262\n | github.com/milvus-io/milvus/pkg/util/retry.Do\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus/[email protected]/util/retry/retry.go:44\n | github.com/zilliztech/milvus-cdc/core/reader.(*replicateChannelManager).startReadCollectionForMilvus\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/replicate_channel_manager.go:261\n | github.com/zilliztech/milvus-cdc/core/reader.(*replicateChannelManager).StartReadCollection\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/replicate_channel_manager.go:318\n | github.com/zilliztech/milvus-cdc/core/reader.(*CollectionReader).StartRead.func1.1\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/collection_reader.go:126\n | github.com/zilliztech/milvus-cdc/core/reader.(*EtcdOp).WatchCollection.func1.1.2.2\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/etcd_op.go:277\n | github.com/zilliztech/milvus-cdc/core/util.(*Map[...]).Range.func1\n | \t/data/tmp/dba_tools/milvus-cdc/core/util/atomic.go:102\n | sync.(*Map).Range\n | \t/usr/local/go/src/sync/map.go:501\n | github.com/zilliztech/milvus-cdc/core/util.(*Map[...]).Range\n | \t/data/tmp/dba_tools/milvus-cdc/core/util/atomic.go:101\n | github.com/zilliztech/milvus-cdc/core/reader.(*EtcdOp).WatchCollection.func1.1.2\n | \t/data/tmp/dba_tools/milvus-cdc/core/reader/etcd_op.go:276\n | github.com/milvus-io/milvus/pkg/util/conc.(*Pool[...]).Submit.func1\n | \t/root/go/pkg/mod/github.com/milvus-io/milvus/[email protected]/util/conc/pool.go:82\n | github.com/panjf2000/ants/v2.(*goWorker).run.func1\n | \t/root/go/pkg/mod/github.com/panjf2000/ants/[email protected]/worker.go:67\n | runtime.goexit\n | \t/usr/local/go/src/runtime/asm_amd64.s:1700\nWraps: (2) can't find collection[database=default][collection=t2]\nError types: (1) *withstack.withStack (2) *errutil.leafError"] [2025/03/06 10:03:02.428 +08:00] [INFO] [writer/channel_writer.go:149] ["receive replicate api event"] [event=CreateCollection] [collection=t2] [2025/03/06 10:03:02.474 +08:00] [INFO] [writer/channel_writer.go:151] ["finish to handle replicate api event"] [event=CreateCollection] [collection=t2] [2025/03/06 10:03:03.429 +08:00] [INFO] [reader/replicate_channel_manager.go:269] ["success to get the collection info in the target instance"] [collection_name=t2] [2025/03/06 10:03:03.430 +08:00] [INFO] [msgstream/mq_msgstream.go:124] ["Msg Stream state"] [can_produce=true] [2025/03/06 10:03:03.446 +08:00] [INFO] [msgstream/mq_msgstream.go:214] ["Successfully create consumer"] [channel=by-dev-rootcoord-dml_6] [subname=by-dev-rootcoord-dml_64121399381276251908] [2025/03/06 10:03:03.447 +08:00] [INFO] [msgstream/mq_msgstream.go:535] ["MsgStream seek begin"] [channel=by-dev-rootcoord-dml_6] [MessageID=CNZMEAAYACAA] [includeCurrentMsg=false] [2025/03/06 10:03:03.563 +08:00] [INFO] [msgstream/mq_msgstream.go:541] ["MsgStream seek finished"] [channel=by-dev-rootcoord-dml_6] [2025/03/06 10:03:03.563 +08:00] [INFO] [reader/stream_creator.go:135] ["success to seek the msg stream"] [channel_name=by-dev-rootcoord-dml_6_456389384978334720v0] [2025/03/06 10:03:03.563 +08:00] [INFO] [msgstream/mq_msgstream.go:224] ["start to close mq msg stream"] ["producer num"=0] ["consumer num"=1] [2025/03/06 10:03:03.570 +08:00] [INFO] [reader/replicate_channel_manager.go:728] ["create a replicate handler"] [source_channel=by-dev-rootcoord-dml_6] [target_channel=by-dev-rootcoord-dml_1] [2025/03/06 10:03:03.570 +08:00] [INFO] [reader/replicate_channel_manager.go:427] ["start read channel in the manager"] [nil_handler=false] [channel=by-dev-rootcoord-dml_6] [target_channel=by-dev-rootcoord-dml_1] [collection_id=456389384978334720] [2025/03/06 10:03:03.570 +08:00] [INFO] [reader/replicate_channel_manager.go:1275] ["start read channel in the handler"] [channel_name=by-dev-rootcoord-dml_6] [target_channel=by-dev-rootcoord-dml_1] [2025/03/06 10:03:03.570 +08:00] [INFO] [reader/replicate_channel_manager.go:438] ["start read the source channel"] [channel_name=by-dev-rootcoord-dml_6] [2025/03/06 10:03:03.570 +08:00] [INFO] [reader/collection_reader.go:131] ["has started to read collection"] [task_id=4e4e6a32b40f493d856d3dbe1504a453] [collection_name=t2] [collection_id=456389384978334720] [2025/03/06 10:03:03.570 +08:00] [INFO] [server/cdc_impl.go:1031] ["start to replicate channel"] [channel=by-dev-rootcoord-dml_1] [2025/03/06 10:03:03.571 +08:00] [INFO] [reader/etcd_op.go:278] ["the collection has been consumed"] [collection_id=456389384978334720] [task_id=4e4e6a32b40f493d856d3dbe1504a453] [2025/03/06 10:03:03.571 +08:00] [INFO] [msgdispatcher/manager.go:67] ["create new dispatcherManager"] [role=cdc-5d21575c67b04500a49763711ed2025e] [nodeID=8444] [pchannel=by-dev-rootcoord-dml_6] [2025/03/06 10:03:03.571 +08:00] [INFO] [msgdispatcher/dispatcher.go:96] ["creating dispatcher..."] [pchannel=by-dev-rootcoord-dml_6] [subName=cdc-5d21575c67b04500a49763711ed2025e-8444-by-dev-rootcoord-dml_6_456389384978334720v0-true] [isMain=true] [2025/03/06 10:03:03.571 +08:00] [INFO] [msgstream/mq_msgstream.go:124] ["Msg Stream state"] [can_produce=true] [2025/03/06 10:03:03.572 +08:00] [INFO] [msgdispatcher/manager.go:178] ["dispatcherManager is running..."] [role=cdc-5d21575c67b04500a49763711ed2025e] [nodeID=8444] [pchannel=by-dev-rootcoord-dml_6] [2025/03/06 10:03:03.589 +08:00] [INFO] [msgstream/mq_msgstream.go:929] ["MsgStream begin to seek start msg: "] [channel=by-dev-rootcoord-dml_6] [MessageID=CNZMEAAYACAA] [2025/03/06 10:03:03.710 +08:00] [INFO] [msgstream/mq_msgstream.go:939] ["MsgStream seek finished"] [channel=by-dev-rootcoord-dml_6] [2025/03/06 10:03:03.714 +08:00] [INFO] [msgstream/mq_msgstream.go:1016] ["skip msg"] [type=CreateCollection] [size=196] [msgTs=456452101201199107] [posTs=456452101201199107] [2025/03/06 10:03:03.717 +08:00] [INFO] [msgdispatcher/dispatcher.go:125] ["seek successfully"] [pchannel=by-dev-rootcoord-dml_6] [subName=cdc-5d21575c67b04500a49763711ed2025e-8444-by-dev-rootcoord-dml_6_456389384978334720v0-true] [isMain=true] [posTs=456452101201199107] [posTime=2025/03/06 10:03:02.341 +08:00] [tsLag=1.376027854s] [2025/03/06 10:03:03.717 +08:00] [INFO] [msgdispatcher/dispatcher.go:161] ["add new target"] [vchannel=by-dev-rootcoord-dml_6_456389384978334720v0] [isMain=true] [2025/03/06 10:03:03.717 +08:00] [INFO] [msgdispatcher/manager.go:115] ["add main dispatcher"] [role=cdc-5d21575c67b04500a49763711ed2025e] [nodeID=8444] [vchannel=by-dev-rootcoord-dml_6_456389384978334720v0] [2025/03/06 10:03:03.717 +08:00] [INFO] [msgdispatcher/dispatcher.go:189] ["get signal"] [pchannel=by-dev-rootcoord-dml_6] [signal=start] [isMain=true] [2025/03/06 10:03:03.717 +08:00] [INFO] [msgdispatcher/dispatcher.go:212] ["handle signal done"] [pchannel=by-dev-rootcoord-dml_6] [signal=start] [isMain=true] [2025/03/06 10:03:03.717 +08:00] [INFO] [msgdispatcher/client.go:114] ["register done"] [role=cdc-5d21575c67b04500a49763711ed2025e] [nodeID=8444] [vchannel=by-dev-rootcoord-dml_6_456389384978334720v0] [dur=146.480842ms] [2025/03/06 10:03:03.717 +08:00] [INFO] [reader/replicate_channel_manager.go:985] ["add collection to channel handler"] [channel_name=by-dev-rootcoord-dml_6_456389384978334720v0] [collection_id=456389384978334720] [collection_name=t2] [seek_channel=by-dev-rootcoord-dml_6] [2025/03/06 10:03:03.718 +08:00] [INFO] [reader/replicate_channel_manager.go:968] ["start to handle the msg pack"] [channel_name=by-dev-rootcoord-dml_6_456389384978334720v0] [2025/03/06 10:03:03.718 +08:00] [INFO] [msgdispatcher/dispatcher.go:217] ["begin to work"] [pchannel=by-dev-rootcoord-dml_6] [isMain=true] [2025/03/06 10:03:03.719 +08:00] [INFO] [writer/replicate_message_manager.go:126] ["new replicate message handler"] [channelName=by-dev-rootcoord-dml_1]

Expected Behavior

No response

Steps To Reproduce

No response

Environment

No response

Anything else?

No response

huangping11 avatar Mar 06 '25 02:03 huangping11

Milvus version: milvusdb/milvus:v2.4.15 milvus-standalone

huangping11 avatar Mar 06 '25 02:03 huangping11

CDC configuration

[root@wy3-db241 configs]# cat cdc.yaml address: 0.0.0.0:8444 maxTaskNum: 100 metaStoreConfig: storeType: mysql mysqlSourceUrl: milvuscdc:milvuscdc@tcp(192.168.3.240:3318)/milvuscdc?charset=utf8 rootPath: cdc-by-dev sourceConfig: etcd: address: - http://127.0.0.1:2379 rootPath: by-dev metaSubPath: meta enableAuth: false username: root password: root123456 enableTLS: false tlsCertPath: deployment/cert/client.pem # path to your cert file tlsKeyPath: deployment/cert/client.key # path to your key file tlsCACertPath: deployment/cert/ca.pem # path to your CACert file tlsMinVersion: 1.3 readChanLen: 4 defaultPartitionName: _default replicateChan: by-dev-replicate-msg pulsar: address: pulsar://192.168.3.241:6650 maxMessageSize: 5242880 tenant: public namespace: default maxNameLength: 256 logLevel: info detectDeadLock: false

huangping11 avatar Mar 06 '25 02:03 huangping11

Configuration of message queue in the source environment。

mq: type: pulsar enablePursuitMode: true # Default value: "true" pursuitLag: 10 # time tick lag threshold to enter pursuit mode, in seconds pursuitBufferSize: 8388608 # pursuit mode buffer size in bytes pursuitBufferTime: 60 # pursuit mode buffer time in seconds mqBufSize: 16 # MQ client consumer buffer length dispatcher: mergeCheckInterval: 1 # the interval time(in seconds) for dispatcher to check whether to merge targetBufSize: 16 # the lenth of channel buffer for targe maxTolerantLag: 3 # Default value: "3", the timeout(in seconds) that target sends msgPack

pulsar: address: 192.168.3.241 port: 6650 # Port of Pulsar service. webport: 8080 # Web port of of Pulsar service. If you connect direcly without proxy, should use 8080.

huangping11 avatar Mar 06 '25 02:03 huangping11

@huangping11 By default, only the default db is synchronized. If you need to synchronize other dbs, you can try using

{
    "db_collections": {
        "*": [
            {
                "name": "*"
            }
        ]
    }
}

However, this can only be done for a single db or all dbs, and the collection only allows one element. We are currently testing this feature.

SimFG avatar Mar 10 '25 07:03 SimFG