pulsar-hub icon indicating copy to clipboard operation
pulsar-hub copied to clipboard

feat(mongo-connector): support copy.existing in source and topics/topic.override in sink

Open ericsyh opened this issue 3 years ago • 8 comments

Motivation

Compares to mongodb-kafka-connector, pulsar-kafka-connector is quite simple. So it will be better to add some useful features on pulsar-kafka-connector to enhance:

  • copy.existing config in source: https://docs.mongodb.com/kafka-connector/current/source-connector/configuration-properties/copy-existing/
  • topics.regex in sink: https://docs.mongodb.com/kafka-connector/current/sink-connector/configuration-properties/kafka-topic/
  • topic.override in sink: https://docs.mongodb.com/kafka-connector/current/sink-connector/configuration-properties/topic-override/

Pls refer the open source code of mongo-kafka.

ericsyh avatar Dec 20 '21 11:12 ericsyh

Okay, I will read the code of mongo-kafka firstly!

lzqdename avatar Dec 21 '21 02:12 lzqdename

I will read the pulsar io source code next week

lzqdename avatar Dec 25 '21 06:12 lzqdename

In pulsar io of mongodb, the offset is not saved , whether this feature should be supported or not in this issue? @ericsyh

lzqdename avatar Dec 25 '21 06:12 lzqdename

In pulsar io of mongodb, the offset is not saved , whether this feature should be supported or not in this issue? @ericsyh

@lzqdename Sorry I am not the maintainer of mongo pulsar io, can you give more details about the offset you mentioned that not saving? In source or sink connector? And in which process that not saving?

ericsyh avatar Dec 26 '21 00:12 ericsyh

okay,I know, the offset is the position at which the consumer will consume when restarted, you can see : com.mongodb.kafka.connect.source.MongoSourceTask.createCursor 【Pls refer the open source code of mongo-kafka.】 in this function, there is a sub function call : getResumeToken(sourceConfig) in function getResumeToken(sourceConfig), it will read the last consumer offset position in product environment,we shoudl save the offset of the consumed record to avoid repeated cosume action

okay, I will write a basic implementation of "copy.existing" due to lackness of time @ericsyh

lzqdename avatar Dec 27 '21 02:12 lzqdename

@lzqdename Cause Pulsar consumer has no need to save offset since the offset info will be saved by Pulsar broker. For example, Pulsar consumer use a subscription name sub-1 to consume for a while and exit for some reason. This consumer just need to connect to pulsar with the same subscription name sub-1, Pulsar broker will send the next message from the last consumed position to this consumer.

ericsyh avatar Dec 27 '21 05:12 ericsyh

consume mongodb,not pulsar, u can see https://github.com/mongodb/mongo-kafka/blob/master/src/main/java/com/mongodb/kafka/connect/source/MongoSourceTask.java

line 397 , getResumeToken(sourceConfig) the resume token is token of mongdb @ericsyh

lzqdename avatar Dec 27 '21 09:12 lzqdename

@lzqdename OK i see, i think there is no need for you to support saving mongodb offset in this task.

ericsyh avatar Dec 28 '21 02:12 ericsyh