kafka-connect-elasticsearch
kafka-connect-elasticsearch copied to clipboard
[HELLO!!] Adds routing support, configurable via 'routing.field.name'
Refs:
- https://github.com/confluentinc/kafka-connect-elasticsearch/issues/223
- https://www.elastic.co/guide/en/elasticsearch/reference/current/mapping-routing-field.html
- https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-bulk.html#bulk-routing
- https://www.elastic.co/guide/en/elasticsearch/reference/current/parent-join.html
Problem
ElasticSearch _routing
field currently is not supported.
Solution
Does this solution apply anywhere else?
- [x] yes
- [ ] no
If yes, where?
Parent/Child relationships -> join field type
Test Strategy
DataConverterTest
has been added.
Integration test might require refactoring of existing tests.
ElasticsearchConnectorIT
/ ElasticsearchConnectorBaseIT
seems to use a very basic schema / payload.
Also ElasticsearchHelperClient
would need to be improved to also support _routing.
Please advise how to approach.
Testing done:
- [x] Unit tests
- [ ] Integration tests
- [ ] System tests
- [x] Manual tests
TODOs / open topics
- [x] 'routing.field.name' config
- [x] read field value, populate bulk
- [x] Unit tests
- [ ] ? Advanced support for parent-join field
- [ ] ? add SMT for parent-join field
- [ ] ? add support directly to connector
- [ ] Integration tests
- [ ] update documentation
Release Plan
- New feature
- Non-breaking change
- New config fields are optional
- Does not require any change or migration upon upgrade
@confluentinc It looks like @hartmut-co-uk just signed our Contributor License Agreement. :+1:
Always at your service,
clabot
I noticed for parent-join use case - in addition to adding the routing, the payload also potentially needs to be enriched with the my_join_field.name
and my_join_field.parent
(for children).
https://www.elastic.co/guide/en/elasticsearch/reference/current/parent-join.html
PUT my-index-000001/_doc/3?routing=1&refresh
{
"my_id": "3",
"parent_id": "1",
"text": "This is an answer",
"my_join_field": {
"name": "answer",
"parent": "1"
}
}
I wonder if it would be worth to also natively build into this connector, instead of forcing the user to enrich data upfront, or build+require a custom SMT.
Note: I tried with InsertField SMT - but since it only supports flat fields it's impossible to enrich the struct for children. https://docs.confluent.io/platform/current/connect/transforms/insertfield.html
So one solution I could see is to add following config options:
-
join.field.name
in above example would be 'my_join_field' -
join.relation.name
in above example would be 'answer' -
join.parent.id.field.name
would essentially be the value field (path), -> e.g. same as would be configured forrouting.field.name
A custom SMT might be the best fit to create the ‘join-field struct’ for ES parent/child relationship on the fly. I wonder if it's possible and would be an acceptable pattern to just add + ship a custom SMT for this along with the connector jar?
Further room for improvement: allow to use data from $key
(not only from a $value
field)
I've implemented a simple SMT (also works just fine having it embedded in the connector jar).
sticking with the ES example it's configured like:
"transforms.insertParentJoin.join.field.name": "my_join_field",
"transforms.insertParentJoin.relation.type.name": "answer",
"transforms.insertParentJoin.parent.id.field.path": "questionId"
(with the actual msg value having a field questionId
as the 'question' parent ID of the answer doc - this would also be used for routing...)
Please provide some feedback on how to proceed further.
Amazing work @hartmut-co-uk :heart_eyes: This is going to help us a lot!
@levzem could you please provide initial feedback?
Also
-
advise how to proceed with integration tests?
Integration test might require refactoring of existing tests. ElasticsearchConnectorIT / ElasticsearchConnectorBaseIT seems to use a very basic schema / payload. Also ElasticsearchHelperClient would need to be improved to also support _routing.
-
Do you think it would make sense to add an SMT to the connector? I have an implementation of a
public class InsertParentJoin<R extends ConnectRecord<R>> implements Transformation<R> {
(I've had to add dependencyorg.apache.kafka:connect-transforms:${kafka.version}
)- Or would/should it make more sense to embed the functionality directly into the connector?
- Or do not add but maybe create + provide as a separate maven artifact?
I'm still keen to help wrap this up, write more tests, .. if we can agree how to proceed.
@yanglei99 @kkonstantine @levzem @dosvath anyone? We so desperately need this and the work is done but it's dead in the water for ~2 months now without any apparent reason :confused:
ping
Pong @yanglei99 @kkonstantine @levzem @dosvath
Went on a sabbatical for 3 months and still :joy: I thought for a moment that this project was abandoned all together but there's still stuff merged few days ago.
@kkonstantine :pray: :pray: :pray: what needs to happen to get some eyes on this? Over half a year open now...
I don't think it's been abandoned in general. I'm currently busy with other things. (I've not yet come to use the proposed changes beyond a POC)
Still happy to contribute if someone confirms that the feature/changes are welcome and will be merged + how to proceed with adding tests / SMT.
Any plans to merge this PR?
Dear maintainers, is this feature/PR welcome, and are further changes required? Is the config option fine as is, what about the new private methods added (are there better alternatives/libraries to use)?
I'm still keen to help wrap this up and write tests, ... if we can agree on how to proceed.
...
If you're still searching for a platform-type of solution, take a look at Apache Flink. You get pretty much access to the bare-bones Elasticsearch API so you can do whatever you want, including routing, timeouts per bulk action, etc. Plus an actual Web UI :star_struck:
If you're still searching for a platform-type of solution, take a look at Apache Flink. You get pretty much access to the bare-bones Elasticsearch API so you can do whatever you want, including routing, timeouts per bulk action, etc. Plus an actual Web UI 🤩
Use flink kafka source -> flink elasticsearch sink -- instead of kafka connect? 🤔 This doesn't unblock getting this PR over the line ^^ but it's a great suggestion of an alternative solution way easier to customise. 👍 👍
Many thanks for bringing this up, given I've been considering Apache Flink for other use cases in my project, too...
Thank you for your submission! We really appreciate it. Like many open source projects, we ask that you sign our Contributor License Agreement before we can accept your contribution.
You have signed the CLA already but the status is still pending? Let us recheck it.
I am no expert but I think this approach can be difficult, routing based upon a value in a record is tricky when it comes to deleting. Typically deleting is triggered by a null record. We noticed when that is the case the delete request does not route the delete to the correct shard because the routing key is missing from the record. We had to take a different approach to get this to work.