kafka-connect-elasticsearch icon indicating copy to clipboard operation
kafka-connect-elasticsearch copied to clipboard

[HELLO!!] Adds routing support, configurable via 'routing.field.name'

Open hartmut-co-uk opened this issue 3 years ago • 20 comments

Refs:

  • https://github.com/confluentinc/kafka-connect-elasticsearch/issues/223
  • https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-routing-field.html
  • https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-routing
  • https://www.elastic.co/guide/en/elasticsearch/reference/current/parent-join.html

Problem

ElasticSearch _routing field currently is not supported.

Solution

Does this solution apply anywhere else?
  • [x] yes
  • [ ] no
If yes, where?

Parent/Child relationships -> join field type

Test Strategy

DataConverterTest has been added.
Integration test might require refactoring of existing tests. ElasticsearchConnectorIT / ElasticsearchConnectorBaseIT seems to use a very basic schema / payload. Also ElasticsearchHelperClient would need to be improved to also support _routing.

Please advise how to approach.

Testing done:
  • [x] Unit tests
  • [ ] Integration tests
  • [ ] System tests
  • [x] Manual tests

TODOs / open topics

  • [x] 'routing.field.name' config
  • [x] read field value, populate bulk
  • [x] Unit tests
  • [ ] ? Advanced support for parent-join field
    • [ ] ? add SMT for parent-join field
    • [ ] ? add support directly to connector
  • [ ] Integration tests
  • [ ] update documentation

Release Plan

  • New feature
  • Non-breaking change
  • New config fields are optional
  • Does not require any change or migration upon upgrade

hartmut-co-uk avatar Jun 24 '21 00:06 hartmut-co-uk

@confluentinc It looks like @hartmut-co-uk just signed our Contributor License Agreement. :+1:

Always at your service,

clabot

ghost avatar Jun 24 '21 00:06 ghost

I noticed for parent-join use case - in addition to adding the routing, the payload also potentially needs to be enriched with the my_join_field.name and my_join_field.parent (for children). https://www.elastic.co/guide/en/elasticsearch/reference/current/parent-join.html

PUT my-index-000001/_doc/3?routing=1&refresh 
{
  "my_id": "3",
  "parent_id": "1",
  "text": "This is an answer",
  "my_join_field": {
    "name": "answer", 
    "parent": "1" 
  }
}

I wonder if it would be worth to also natively build into this connector, instead of forcing the user to enrich data upfront, or build+require a custom SMT.

Note: I tried with InsertField SMT - but since it only supports flat fields it's impossible to enrich the struct for children. https://docs.confluent.io/platform/current/connect/transforms/insertfield.html

So one solution I could see is to add following config options:

  • join.field.name in above example would be 'my_join_field'
  • join.relation.name in above example would be 'answer'
  • join.parent.id.field.name would essentially be the value field (path), -> e.g. same as would be configured for routing.field.name

hartmut-co-uk avatar Jun 24 '21 01:06 hartmut-co-uk

A custom SMT might be the best fit to create the ‘join-field struct’ for ES parent/child relationship on the fly. I wonder if it's possible and would be an acceptable pattern to just add + ship a custom SMT for this along with the connector jar?

hartmut-co-uk avatar Jun 24 '21 10:06 hartmut-co-uk

Further room for improvement: allow to use data from $key (not only from a $value field)

hartmut-co-uk avatar Jun 27 '21 11:06 hartmut-co-uk

I've implemented a simple SMT (also works just fine having it embedded in the connector jar).

sticking with the ES example it's configured like:

  "transforms.insertParentJoin.join.field.name": "my_join_field",
  "transforms.insertParentJoin.relation.type.name": "answer",
  "transforms.insertParentJoin.parent.id.field.path": "questionId"

(with the actual msg value having a field questionId as the 'question' parent ID of the answer doc - this would also be used for routing...)

Please provide some feedback on how to proceed further.

hartmut-co-uk avatar Jun 27 '21 12:06 hartmut-co-uk

Amazing work @hartmut-co-uk :heart_eyes: This is going to help us a lot!

frankkoornstra avatar Jul 01 '21 09:07 frankkoornstra

@levzem could you please provide initial feedback?

Also

  • advise how to proceed with integration tests?

    Integration test might require refactoring of existing tests. ElasticsearchConnectorIT / ElasticsearchConnectorBaseIT seems to use a very basic schema / payload. Also ElasticsearchHelperClient would need to be improved to also support _routing.

  • Do you think it would make sense to add an SMT to the connector? I have an implementation of a public class InsertParentJoin<R extends ConnectRecord<R>> implements Transformation<R> { (I've had to add dependency org.apache.kafka:connect-transforms:${kafka.version})

    • Or would/should it make more sense to embed the functionality directly into the connector?
    • Or do not add but maybe create + provide as a separate maven artifact?

hartmut-co-uk avatar Jul 02 '21 15:07 hartmut-co-uk

I'm still keen to help wrap this up, write more tests, .. if we can agree how to proceed.

hartmut-co-uk avatar Aug 06 '21 10:08 hartmut-co-uk

@yanglei99 @kkonstantine @levzem @dosvath anyone? We so desperately need this and the work is done but it's dead in the water for ~2 months now without any apparent reason :confused:

frankkoornstra avatar Aug 19 '21 09:08 frankkoornstra

ping

hartmut-co-uk avatar Sep 16 '21 09:09 hartmut-co-uk

Pong @yanglei99 @kkonstantine @levzem @dosvath

frankkoornstra avatar Sep 21 '21 07:09 frankkoornstra

Went on a sabbatical for 3 months and still :joy: I thought for a moment that this project was abandoned all together but there's still stuff merged few days ago.

@kkonstantine :pray: :pray: :pray: what needs to happen to get some eyes on this? Over half a year open now...

frankkoornstra avatar Feb 01 '22 13:02 frankkoornstra

I don't think it's been abandoned in general. I'm currently busy with other things. (I've not yet come to use the proposed changes beyond a POC)

Still happy to contribute if someone confirms that the feature/changes are welcome and will be merged + how to proceed with adding tests / SMT.

hartmut-co-uk avatar Feb 01 '22 15:02 hartmut-co-uk

Any plans to merge this PR?

dania-ru avatar May 20 '22 17:05 dania-ru

Dear maintainers, is this feature/PR welcome, and are further changes required? Is the config option fine as is, what about the new private methods added (are there better alternatives/libraries to use)?

I'm still keen to help wrap this up and write tests, ... if we can agree on how to proceed.

hartmut-co-uk avatar May 21 '22 10:05 hartmut-co-uk

...

hartmut-co-uk avatar Sep 19 '22 18:09 hartmut-co-uk

If you're still searching for a platform-type of solution, take a look at Apache Flink. You get pretty much access to the bare-bones Elasticsearch API so you can do whatever you want, including routing, timeouts per bulk action, etc. Plus an actual Web UI :star_struck:

frankkoornstra avatar Sep 26 '22 07:09 frankkoornstra

If you're still searching for a platform-type of solution, take a look at Apache Flink. You get pretty much access to the bare-bones Elasticsearch API so you can do whatever you want, including routing, timeouts per bulk action, etc. Plus an actual Web UI 🤩

Use flink kafka source -> flink elasticsearch sink -- instead of kafka connect? 🤔 This doesn't unblock getting this PR over the line ^^ but it's a great suggestion of an alternative solution way easier to customise. 👍 👍

Many thanks for bringing this up, given I've been considering Apache Flink for other use cases in my project, too...

hartmut-co-uk avatar Sep 26 '22 08:09 hartmut-co-uk

CLA assistant check
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.

cla-assistant[bot] avatar Sep 11 '23 09:09 cla-assistant[bot]

I am no expert but I think this approach can be difficult, routing based upon a value in a record is tricky when it comes to deleting. Typically deleting is triggered by a null record. We noticed when that is the case the delete request does not route the delete to the correct shard because the routing key is missing from the record. We had to take a different approach to get this to work.

sandyplace avatar Jan 06 '24 06:01 sandyplace