flink-cdc
flink-cdc copied to clipboard
[MongoDB] Support initial.snapshotting.pipeline related configs in table api
Search before asking
- [X] I searched in the issues and found nothing similar.
Motivation
MongoDB's startup.mode.copy.existing.pipeline(akka initial.snapshotting.pipeline in mongo-cdc) is an array of JSON objects describing the pipeline operations to run when copying existing data, see link. This can improve the use of indexes by the copying manager and make copying more efficient, which is very important in some user scenarios. Besides, there are also some related configs, like startup.mode.copy.existing.queue.size, startup.mode.copy.existing.max.threads. Currently we only support these configs in datastream api, for the convenience of users, we should also support them in table api.
Solution
Support initial.snapshotting.pipeline related configs in table api
Alternatives
No response
Anything else?
Note that in 2.3.0, we remove these configs from table api when support incremental snapshot mode for MongoDB in this commit, since in incremental snapshot mode, the semantic is inconsistent when uses the pipeline operations. The reason is that in snapshot phase of incremental snapshot mode, the oplog will be played back after each snapshot to compensate for changes, but the pipeline operations in copy.existing.pipeline are not applied to the playback oplog, which means the semantic of this config is inconsistent. But in legacy debezium mode, the behaviour is correct, so we add these configs back in debezium mode for better forward compatibility. And notify user not to use them in incremental snapshot mode due to above reason.
Are you willing to submit a PR?
- [X] I'm willing to submit a PR!
Thanks @herunkang2018 for reporting this issue. Assigned to you, please go ahead.
Closing this issue as it has been migrated to Apache Jira.