kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

Support elastic-search routing for kafka-connect-elasticsearch sink.

Open rkalluri-clgx opened this issue 6 years ago • 20 comments

I would like to know if we support routing of records to certain shards of elastic search as explained below.

https://www.elastic.co/guide/en/elasticsearch/reference/6.2/mapping-routing-field.html

Looks like we need to be able to specify routing param in the elastic url, and can vary from message to message. Looks like this needs to be dynamic with a SMT of some sort to pick routing from the message just like we pick the index. Either that or have a per partition route from Kafka for performance reasons. Just wanted to kickstart the discussions.

rkalluri-clgx avatar Jul 13 '18 01:07 rkalluri-clgx

+1 Would love to see this.

DevonPeroutky avatar Jul 22 '18 03:07 DevonPeroutky

+1 Would also benefit from this

robgryn avatar Oct 02 '18 09:10 robgryn

+1 This feature would enable parent-child relationship in target Elasticsearch cluster, otherwise I have no idea how to achieve this.

maxsel avatar Oct 02 '18 09:10 maxsel

+1 I realy need this feature, I changed all my ES mapping to join / routing. I'm now stuck with kafka connect to add the routing={id}

matpersonne avatar Oct 25 '18 09:10 matpersonne

+1 This would be really useful

mikelsanvi avatar Nov 02 '18 11:11 mikelsanvi

I also need this feature for defining parent/child join relations: https://www.elastic.co/guide/en/elasticsearch/reference/6.3/parent-join.html

fubhy avatar Jan 03 '19 05:01 fubhy

+1 Having issues finding a way to implement parent-child relationship without this

MottiniMauro avatar Apr 25 '19 18:04 MottiniMauro

+1 Would be great to make our cluster more efficient

frankkoornstra avatar May 17 '19 10:05 frankkoornstra

@levzem Is there a plan to support this feature? we really need a way to flush parent-child relationship records in kafka-connect-elasticsearch.

dgthomugo avatar Dec 09 '19 13:12 dgthomugo

Is there any workaround??

mIkhail-zaretsky avatar Jan 24 '20 11:01 mIkhail-zaretsky

same here. Would like to be able to insert parent-child records via elasticsearch connector. Any updates on this?

arungitan avatar Jun 05 '20 18:06 arungitan

Can we achieve that by using a custom ID?
In formula shard_num = hash(_routing) % num_primary_shards
Elastic use document id as _routing,
so we can pass our own id for achieving something like custom routing, isn't so?

Arsennikum avatar Jun 18 '20 16:06 Arsennikum

That leaks a details Kafka Connect like how the shard number is calculated and how many shards the target index has. Besides, it would involve storing state about which ids are still available, creating an algo to come up with the next id for a targeted shard... all things that shouldn't be necessary if routing would be supported.

frankkoornstra avatar Jun 19 '20 15:06 frankkoornstra

for those still trying to solve this, I found this cool workaround (if you have some control on the elasticsearch side): Use an ingest script processor! https://www.elastic.co/guide/en/elasticsearch/reference/current/script-processor.html https://www.elastic.co/guide/en/elasticsearch/reference/current/accessing-data-in-pipelines.html The line of interest is this: The following metadata fields are accessible by a processor: _index, _type, _id, _routing So, not only _routing but even _id and _index itself can be conveniently scripted.

arungitan avatar Jun 19 '20 16:06 arungitan

Workaround is fine for the index operation, but it isn't useful for the delete (tombstones) operation because ES will not start ingest pipeline for the update and delete operations. Correct me if I'm not right. In bulk operations, each entry must contain a routing value using the routing field. Therefore, it would be great if the connector supported it.

janpetr11 avatar Nov 23 '20 10:11 janpetr11

Does anyone have an idea of the work involved? Maybe it can be picked up by a few people but someone guiding the work that has a good overview could help immensely.

frankkoornstra avatar Mar 23 '21 12:03 frankkoornstra

Hi, I've had also been planning to evaluate routing option and do some testing. There's an old PR which had been closed but didn't look so bad at a quick glance: https://github.com/confluentinc/kafka-connect-elasticsearch/pull/156

hartmut-co-uk avatar Mar 24 '21 12:03 hartmut-co-uk

Good find! I commented in the PR. Maybe the contributer is still around

frankkoornstra avatar Mar 25 '21 09:03 frankkoornstra

note: I'm about to test implement this feature with current state.. will create a PR if this is working out.

hartmut-co-uk avatar Jun 22 '21 17:06 hartmut-co-uk

Update: going well, implementation was relatively simple, also was able today to manually test ingesting avro data from kafka topic into defined ES index with mapping and parent-join.

Though I noticed for parent-join use case - in addition to adding the routing, the payload also potentially needs to be enriched with the my_join_field.name and my_join_field.parent (for children). https://www.elastic.co/guide/en/elasticsearch/reference/current/parent-join.html

PUT my-index-000001/_doc/3?routing=1&refresh 
{
  "my_id": "3",
  "text": "This is an answer",
  "my_join_field": {
    "name": "answer", 
    "parent": "1" 
  }
}

I wonder if it would be worth to also natively build into this connector, instead of forcing the user to enrich data upfront, or build+require a custom SMT.

Note: I tried with InsertField SMT - but since it only supports flat fields it's impossible to enrich the struct for children. https://docs.confluent.io/platform/current/connect/transforms/insertfield.html

hartmut-co-uk avatar Jun 24 '21 00:06 hartmut-co-uk