kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
Mapping of topic to specific index in elastic-sink connector
Hello
I am trying to map topic: mytopic to elasticsearch index: myindex in distributed mode configuration using this property- "topic.index.map": "mytopic:myindex".
But this functionality seems to be deprecated. Everywhere I see that it has been removed but I couldn't find an alternative or a straight work around. https://github.com/confluentinc/kafka-connect-elasticsearch/issues/595 https://github.com/confluentinc/kafka-connect-elasticsearch/issues/385 https://github.com/confluentinc/kafka-connect-elasticsearch/issues/341
Hence, opening this for a verified solution. Requesting folks to kindly help here.
Enviornment: elasticsearch version: 7.17.9 kafka-connect-elasticsearch version: 14.0.3
@rmoff Requesting your assistance here!!
Hi,
I don't work with Kafka directly anymore. A good place to go for help is https://www.confluent.io/en-gb/community/ask-the-community/.
thanks.
Thanks for the suggestion @rmoff
Not sure if it still helps - you can use the RegexRouter to change the topic name before it is written to Elastic: https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html
We use this a couple of times. Works quite well. Although there are some restrictions when you work with Elastic 8. Then you have to set the flush.synchronously connector property to true.
Not sure if it still helps - you can use the RegexRouter to change the topic name before it is written to Elastic: https://docs.confluent.io/platform/current/connect/transforms/regexrouter.html
We use this a couple of times. Works quite well. Although there are some restrictions when you work with Elastic 8. Then you have to set the flush.synchronously connector property to true.
@Protoss78 do you have any example for it?
@jliunyu Sorry for the late reply. Here is a short example I reduced from one of our running connectors:
"name": "my-elastic-connector",
"config": {
"connector.class": "io.confluent.connect.elasticsearch.ElasticsearchSinkConnector",
"transforms": "changeIndexName",
"topics": "original.topic.name",
"transforms.changeIndexName.type": "org.apache.kafka.connect.transforms.RegexRouter",
"transforms.changeIndexName.regex": ".*",
"transforms.changeIndexName.replacement": "changed-topic-to-index-name"
...
}
}