kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

Mapping of topic to specific index in elastic-sink connector

Open rishabhk09 opened this issue 2 years ago • 6 comments

Hello

I am trying to map topic: mytopic to elasticsearch index: myindex in distributed mode configuration using this property- "topic.index.map": "mytopic:myindex".

But this functionality seems to be deprecated. Everywhere I see that it has been removed but I couldn't find an alternative or a straight work around. https://github.com/confluentinc/kafka-connect-elasticsearch/issues/595 https://github.com/confluentinc/kafka-connect-elasticsearch/issues/385 https://github.com/confluentinc/kafka-connect-elasticsearch/issues/341

Hence, opening this for a verified solution. Requesting folks to kindly help here.

Enviornment: elasticsearch version: 7.17.9 kafka-connect-elasticsearch version: 14.0.3

rishabhk09 avatar Mar 29 '23 07:03 rishabhk09

@rmoff Requesting your assistance here!!

rishabhk09 avatar Mar 29 '23 08:03 rishabhk09

Hi,

I don't work with Kafka directly anymore. A good place to go for help is https://www.confluent.io/en-gb/community/ask-the-community/.

thanks.

rmoff avatar Mar 29 '23 09:03 rmoff

Thanks for the suggestion @rmoff

rishabhk09 avatar Mar 29 '23 09:03 rishabhk09

Not sure if it still helps - you can use the RegexRouter to change the topic name before it is written to Elastic: https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html

We use this a couple of times. Works quite well. Although there are some restrictions when you work with Elastic 8. Then you have to set the flush.synchronously connector property to true.

Protoss78 avatar Aug 08 '23 08:08 Protoss78

Not sure if it still helps - you can use the RegexRouter to change the topic name before it is written to Elastic: https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html

We use this a couple of times. Works quite well. Although there are some restrictions when you work with Elastic 8. Then you have to set the flush.synchronously connector property to true.

@Protoss78 do you have any example for it?

jliunyu avatar Dec 07 '23 21:12 jliunyu

@jliunyu Sorry for the late reply. Here is a short example I reduced from one of our running connectors:

  "name": "my-elastic-connector",
  "config": {
    "connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
    "transforms": "changeIndexName",
    "topics": "original.topic.name",
    "transforms.changeIndexName.type": "org.apache.kafka.connect.transforms.RegexRouter",
    "transforms.changeIndexName.regex": ".*",
    "transforms.changeIndexName.replacement": "changed-topic-to-index-name"
    ...
  }
}

Protoss78 avatar Jan 09 '24 08:01 Protoss78